Merge branch 'slf/otp-refactoring-step1' (merge comments follow)

This finishes the first stage of making an OTP-style application
out of the `prototype/demo-day` code. The process structure is not
fully OTP compliant. I'm not sure if I really want it to be 100%
OTP style, but that decision can be deferred for a little while
yet.

There are probably "bugs" with brick shutdown, such as process
leaks. That ought to be fixed someday. The use of the Erlang process
registry for finding writer/sequencer processes is nifty (for a
quick hack), but it also leaks atoms (not good for long-term use).
This commit is contained in:
Scott Lystig Fritchie 2015-04-06 12:21:07 +09:00
commit d2c2929084
27 changed files with 3656 additions and 3 deletions

8
.gitignore vendored
View file

@ -1,9 +1,11 @@
prototype/chain-manager/patch.*
.eunit
deps
*.o
ebin/*.beam
*.plt
erl_crash.dump
rel/example_project
.concrete/DEV_MODE
.rebar
doc/edoc-info
doc/erlang.png
doc/*.html
doc/stylesheet.css

41
Makefile Normal file
View file

@ -0,0 +1,41 @@
REBAR_BIN := $(shell which rebar)
ifeq ($(REBAR_BIN),)
REBAR_BIN = ./rebar
endif
.PHONY: rel deps package pkgclean
all: deps compile
compile:
$(REBAR_BIN) compile
deps:
$(REBAR_BIN) get-deps
clean:
$(REBAR_BIN) -r clean
test: deps compile eunit
eunit:
$(REBAR_BIN) -v skip_deps=true eunit
pulse: compile
env USE_PULSE=1 $(REBAR_BIN) skip_deps=true clean compile
env USE_PULSE=1 $(REBAR_BIN) skip_deps=true -D PULSE eunit
APPS = kernel stdlib sasl erts ssl compiler eunit crypto
PLT = $(HOME)/.machi_dialyzer_plt
build_plt: deps compile
dialyzer --build_plt --output_plt $(PLT) --apps $(APPS) deps/*/ebin
dialyzer: deps compile
dialyzer -Wno_return --plt $(PLT) ebin
dialyzer-test: deps compile
dialyzer -Wno_return --plt $(PLT) .eunit
clean_plt:
rm $(PLT)

37
TODO-shortterm.org Normal file
View file

@ -0,0 +1,37 @@
* To Do list
** DONE remove the escript* stuff from machi_util.erl
** DONE Add functions to manipulate 1-chain projections
- Add epoch ID = epoch number + checksum of projection!
Done via compare() func.
** DONE Change all protocol ops to add epoch ID
** TODO Add projection store to each FLU.
*** DONE What should the API look like? (borrow from chain mgr PoC?)
Yeah, I think that's pretty complete. Steal it now, worry later.
*** DONE Choose protocol & TCP port. Share with get/put? Separate?
Hrm, I like the idea of having a single TCP port to talk to any single
FLU.
To make the protocol "easy" to hack, how about using the same basic
method as append/write where there's a variable size blob. But we'll
format that blob as a term_to_binary(). Then dispatch to a single
func, and pattern match Erlang style in that func.
*** TODO Do it.
** TODO Change all protocol ops to enforce the epoch ID
** TODO Add projection wedging logic to each FLU.
- Add no-wedging state to make testing easier?
** TODO Move prototype/chain-manager code to "top" of source tree
*** TODO Preserve current test code (leave as-is? tiny changes?)
*** TODO Make chain manager code flexible enough to run "real world" or "sim"
** TODO Replace registered name use from FLU write/append dispatcher
** TODO Move the FLU server to gen_server behavior?

2
ebin/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
*.beam
*.app

26
include/machi.hrl Normal file
View file

@ -0,0 +1,26 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-define(MAX_FILE_SIZE, 256*1024*1024). % 256 MBytes
-define(MAX_CHUNK_SIZE, ((1 bsl 32) - 1)).
%% -define(DATA_DIR, "/Volumes/SAM1/seq-tests/data").
-define(DATA_DIR, "./data").
-define(MINIMUM_OFFSET, 1024).

View file

@ -0,0 +1,53 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-type pv1_csum() :: binary().
-type pv1_epoch() :: {pv1_epoch_n(), pv1_csum()}.
-type pv1_epoch_n() :: non_neg_integer().
-type pv1_server() :: atom() | binary().
-type pv1_timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}.
-define(DUMMY_PV1_EPOCH, {0,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>}).
-record(projection_v1, {
epoch_number :: pv1_epoch_n(),
epoch_csum :: pv1_csum(),
all_members :: [pv1_server()],
member_dict :: orddict:orddict(),
down :: [pv1_server()],
creation_time :: pv1_timestamp(),
author_server :: pv1_server(),
upi :: [pv1_server()],
repairing :: [pv1_server()],
dbg :: list(), %proplist(), is checksummed
dbg2 :: list() %proplist(), is not checksummed
}).
-define(MACHI_DEFAULT_TCP_PORT, 50000).
-record(p_srvr, {
name :: pv1_server(),
proto = 'ipv4' :: 'ipv4' | 'disterl', % disterl? Hrm.
address :: term(), % Protocol-specific
port :: term(), % Protocol-specific
props = [] :: list() % proplist for other related info
}).
-define(SHA_MAX, (1 bsl (20*8))).

BIN
rebar vendored Executable file

Binary file not shown.

7
rebar.config Normal file
View file

@ -0,0 +1,7 @@
%%% {erl_opts, [warnings_as_errors, {parse_transform, lager_transform}, debug_info]}.
{erl_opts, [{parse_transform, lager_transform}, debug_info]}.
{deps, [
{lager, ".*", {git, "git://github.com/basho/lager.git", {tag, "2.0.1"}}}
]}.

47
rebar.config.script Normal file
View file

@ -0,0 +1,47 @@
PulseBuild = case os:getenv("USE_PULSE") of
false ->
false;
_ ->
true
end,
case PulseBuild of
true ->
PulseOpts =
[{pulse_no_side_effect,
[{erlang,display,1}
]},
{pulse_side_effect,
[ {does_not_exist_yet, some_func, '_'}
, {prim_file, '_', '_'}
, {file, '_', '_'}
, {filelib, '_', '_'}
, {os, '_', '_'} ]},
{pulse_replace_module,
[ {gen_server, pulse_gen_server}
, {application, pulse_application}
, {supervisor, pulse_supervisor} ]}
],
PulseCFlags = [{"CFLAGS", "$CFLAGS -DPULSE"}],
UpdConfig = case lists:keysearch(eunit_compile_opts, 1, CONFIG) of
{value, {eunit_compile_opts, Opts}} ->
lists:keyreplace(eunit_compile_opts,
1,
CONFIG,
{eunit_compile_opts, Opts ++ PulseOpts});
_ ->
[{eunit_compile_opts, PulseOpts} | CONFIG]
end,
case lists:keysearch(port_env, 1, UpdConfig) of
{value, {port_env, PortEnv}} ->
lists:keyreplace(port_env,
1,
UpdConfig,
{port_env, PortEnv ++ PulseCFlags});
_ ->
[{port_env, PulseCFlags} | UpdConfig]
end;
false ->
CONFIG
end.

13
src/machi.app.src Normal file
View file

@ -0,0 +1,13 @@
{application, machi, [
{description, "A village of write-once files."},
{vsn, "0.0.0"},
{applications, [kernel, stdlib, sasl, crypto]},
{mod,{machi_app,[]}},
{registered, []},
{env, [
{flu_list,
[
{flu_a, 32900, "./data.flu_a"}
]}
]}
]}.

126
src/machi_admin_util.erl Normal file
View file

@ -0,0 +1,126 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_admin_util).
%% TODO Move these types to a common header file? (also machi_flu1_client.erl?)
-type inet_host() :: inet:ip_address() | inet:hostname().
-type inet_port() :: inet:port_number().
-export([
verify_file_checksums_local/3, verify_file_checksums_local/4,
verify_file_checksums_remote/3, verify_file_checksums_remote/4
]).
-compile(export_all).
-include("machi.hrl").
-include("machi_projection.hrl").
-define(FLU_C, machi_flu1_client).
-spec verify_file_checksums_local(port(), machi_flu1_client:epoch_id(), binary()|list()) ->
{ok, [tuple()]} | {error, term()}.
verify_file_checksums_local(Sock1, EpochID, Path) when is_port(Sock1) ->
verify_file_checksums_local2(Sock1, EpochID, Path).
-spec verify_file_checksums_local(inet_host(), inet_port(),
machi_flu1_client:epoch_id(), binary()|list()) ->
{ok, [tuple()]} | {error, term()}.
verify_file_checksums_local(Host, TcpPort, EpochID, Path) ->
Sock1 = machi_util:connect(Host, TcpPort),
try
verify_file_checksums_local2(Sock1, EpochID, Path)
after
catch gen_tcp:close(Sock1)
end.
-spec verify_file_checksums_remote(port(), machi_flu1_client:epoch_id(), binary()|list()) ->
{ok, [tuple()]} | {error, term()}.
verify_file_checksums_remote(Sock1, EpochID, File) when is_port(Sock1) ->
verify_file_checksums_remote2(Sock1, EpochID, File).
-spec verify_file_checksums_remote(inet_host(), inet_port(),
machi_flu1_client:epoch_id(), binary()|list()) ->
{ok, [tuple()]} | {error, term()}.
verify_file_checksums_remote(Host, TcpPort, EpochID, File) ->
Sock1 = machi_util:connect(Host, TcpPort),
try
verify_file_checksums_remote2(Sock1, EpochID, File)
after
catch gen_tcp:close(Sock1)
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%
verify_file_checksums_local2(Sock1, EpochID, Path0) ->
Path = machi_util:make_string(Path0),
case file:open(Path, [read, binary, raw]) of
{ok, FH} ->
File = re:replace(Path, ".*/", "", [{return, binary}]),
try
ReadChunk = fun(_File, Offset, Size) ->
file:pread(FH, Offset, Size)
end,
verify_file_checksums_common(Sock1, EpochID, File, ReadChunk)
after
file:close(FH)
end;
Else ->
Else
end.
verify_file_checksums_remote2(Sock1, EpochID, File) ->
ReadChunk = fun(File_name, Offset, Size) ->
?FLU_C:read_chunk(Sock1, EpochID,
File_name, Offset, Size)
end,
verify_file_checksums_common(Sock1, EpochID, File, ReadChunk).
verify_file_checksums_common(Sock1, EpochID, File, ReadChunk) ->
try
case ?FLU_C:checksum_list(Sock1, EpochID, File) of
{ok, Info} ->
Res = lists:foldl(verify_chunk_checksum(File, ReadChunk),
[], Info),
{ok, Res};
{error, no_such_file}=Nope ->
Nope;
{error, _}=Else ->
Else
end
catch
What:Why ->
{error, {What, Why, erlang:get_stacktrace()}}
end.
verify_chunk_checksum(File, ReadChunk) ->
fun({Offset, Size, CSum}, Acc) ->
case ReadChunk(File, Offset, Size) of
{ok, Chunk} ->
CSum2 = machi_util:checksum(Chunk),
if CSum == CSum2 ->
Acc;
true ->
[{Offset, Size, File, CSum, now, CSum2}|Acc]
end;
_Else ->
[{Offset, Size, File, CSum, now, read_failure}|Acc]
end
end.

37
src/machi_app.erl Normal file
View file

@ -0,0 +1,37 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_app).
-behaviour(application).
%% Application callbacks
-export([start/2, stop/1]).
start(_StartType, _StartArgs) ->
case machi_sup:start_link() of
{ok, Pid} ->
{ok, Pid};
Error ->
Error
end.
stop(_State) ->
ok.

459
src/machi_chash.erl Normal file
View file

@ -0,0 +1,459 @@
%%%-------------------------------------------------------------------
%%% Copyright (c) 2007-2011 Gemini Mobile Technologies, Inc. All rights reserved.
%%% Copyright (c) 2013-2015 Basho Technologies, Inc. All rights reserved.
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
%%% You may obtain a copy of the License at
%%%
%%% http://www.apache.org/licenses/LICENSE-2.0
%%%
%%% Unless required by applicable law or agreed to in writing, software
%%% distributed under the License is distributed on an "AS IS" BASIS,
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% See the License for the specific language governing permissions and
%%% limitations under the License.
%%%
%%%-------------------------------------------------------------------
%% Consistent hashing library. Also known as "random slicing".
%% Originally from the Hibari DB source code at https://github.com/hibari
%%
%% TODO items:
%%
%% 1. Refactor to use bigints instead of floating point numbers. The
%% ?SMALLEST_SIGNIFICANT_FLOAT_SIZE macro below doesn't allow as
%% much wiggle-room for making really small hashing range
%% definitions.
-module(machi_chash).
-define(SMALLEST_SIGNIFICANT_FLOAT_SIZE, 0.1e-12).
-define(SHA_MAX, (1 bsl (20*8))).
%% -compile(export_all).
-export([make_float_map/1, make_float_map/2,
sum_map_weights/1,
make_tree/1,
query_tree/2,
hash_binary_via_float_map/2,
hash_binary_via_float_tree/2,
pretty_with_integers/2,
pretty_with_integers/3]).
-export([make_demo_map1/0, make_demo_map2/0]).
-export([zzz_usage_details/0]). % merely to give EDoc a hint of our intent
-type owner_name() :: term().
%% Owner for a range on the unit interval. We are agnostic about its
%% type.
-type weight() :: non_neg_integer().
%% For this library, a weight is an integer which specifies the
%% capacity of a "owner" relative to other owners. For example, if
%% owner A with a weight of 10, and if owner B has a weight of 20,
%% then B will be assigned twice as much of the unit interval as A.
-type float_map() :: [{owner_name(), float()}].
%% A float map subdivides the unit interval, starting at 0.0, to
%% partitions that are assigned to various owners. The sum of all
%% floats must be exactly 1.0 (or close enough for floating point
%% purposes).
-opaque float_tree() :: gb_trees:tree(float(), owner_name()).
%% We can't use gb_trees:tree() because 'nil' (the empty tree) is
%% never valid in our case. But teaching Dialyzer that is difficult.
-type owner_int_range() :: {owner_name(), non_neg_integer(), non_neg_integer()}.
%% Used when "prettying" a float map.
-type owner_weight() :: {owner_name(), weight()}.
-type owner_weight_list() :: [owner_weight()].
%% A owner_weight_list is a definition of brick assignments over the
%% unit interval [0.0, 1.0]. The sum of all floats must be 1.0. For
%% example, [{{br1,nd1}, 0.25}, {{br2,nd1}, 0.5}, {{br3,nd1}, 0.25}].
-export_type([float_map/0, float_tree/0]).
%% @doc Create a float map, based on a basic owner weight list.
-spec make_float_map(owner_weight_list()) -> float_map().
make_float_map(NewOwnerWeights) ->
make_float_map([], NewOwnerWeights).
%% @doc Create a float map, based on an older float map and a new weight
%% list.
%%
%% The weights in the new weight list may be different than (or the
%% same as) whatever weights were used to make the older float map.
-spec make_float_map(float_map(), owner_weight_list()) -> float_map().
make_float_map([], NewOwnerWeights) ->
Sum = add_all_weights(NewOwnerWeights),
DiffMap = [{Ch, Wt/Sum} || {Ch, Wt} <- NewOwnerWeights],
make_float_map2([{unused, 1.0}], DiffMap, NewOwnerWeights);
make_float_map(OldFloatMap, NewOwnerWeights) ->
NewSum = add_all_weights(NewOwnerWeights),
%% Normalize to unit interval
%% NewOwnerWeights2 = [{Ch, Wt / NewSum} || {Ch, Wt} <- NewOwnerWeights],
%% Reconstruct old owner weights (will be normalized to unit interval)
SumOldFloatsDict =
lists:foldl(fun({Ch, Wt}, OrdDict) ->
orddict:update_counter(Ch, Wt, OrdDict)
end, orddict:new(), OldFloatMap),
OldOwnerWeights = orddict:to_list(SumOldFloatsDict),
OldSum = add_all_weights(OldOwnerWeights),
OldChs = [Ch || {Ch, _} <- OldOwnerWeights],
NewChs = [Ch || {Ch, _} <- NewOwnerWeights],
OldChsOnly = OldChs -- NewChs,
%% Mark any space in by a deleted owner as unused.
OldFloatMap2 = lists:map(
fun({Ch, Wt} = ChWt) ->
case lists:member(Ch, OldChsOnly) of
true ->
{unused, Wt};
false ->
ChWt
end
end, OldFloatMap),
%% Create a diff map of changing owners and added owners
DiffMap = lists:map(fun({Ch, NewWt}) ->
case orddict:find(Ch, SumOldFloatsDict) of
{ok, OldWt} ->
{Ch, (NewWt / NewSum) -
(OldWt / OldSum)};
error ->
{Ch, NewWt / NewSum}
end
end, NewOwnerWeights),
make_float_map2(OldFloatMap2, DiffMap, NewOwnerWeights).
make_float_map2(OldFloatMap, DiffMap, _NewOwnerWeights) ->
FloatMap = apply_diffmap(DiffMap, OldFloatMap),
XX = combine_neighbors(collapse_unused_in_float_map(FloatMap)),
XX.
apply_diffmap(DiffMap, FloatMap) ->
SubtractDiff = [{Ch, abs(Diff)} || {Ch, Diff} <- DiffMap, Diff < 0],
AddDiff = [D || {_Ch, Diff} = D <- DiffMap, Diff > 0],
TmpFloatMap = iter_diffmap_subtract(SubtractDiff, FloatMap),
iter_diffmap_add(AddDiff, TmpFloatMap).
add_all_weights(OwnerWeights) ->
lists:foldl(fun({_Ch, Weight}, Sum) -> Sum + Weight end, 0.0, OwnerWeights).
iter_diffmap_subtract([{Ch, Diff}|T], FloatMap) ->
iter_diffmap_subtract(T, apply_diffmap_subtract(Ch, Diff, FloatMap));
iter_diffmap_subtract([], FloatMap) ->
FloatMap.
iter_diffmap_add([{Ch, Diff}|T], FloatMap) ->
iter_diffmap_add(T, apply_diffmap_add(Ch, Diff, FloatMap));
iter_diffmap_add([], FloatMap) ->
FloatMap.
apply_diffmap_subtract(Ch, Diff, [{Ch, Wt}|T]) ->
if Wt == Diff ->
[{unused, Wt}|T];
Wt > Diff ->
[{Ch, Wt - Diff}, {unused, Diff}|T];
Wt < Diff ->
[{unused, Wt}|apply_diffmap_subtract(Ch, Diff - Wt, T)]
end;
apply_diffmap_subtract(Ch, Diff, [H|T]) ->
[H|apply_diffmap_subtract(Ch, Diff, T)];
apply_diffmap_subtract(_Ch, _Diff, []) ->
[].
apply_diffmap_add(Ch, Diff, [{unused, Wt}|T]) ->
if Wt == Diff ->
[{Ch, Wt}|T];
Wt > Diff ->
[{Ch, Diff}, {unused, Wt - Diff}|T];
Wt < Diff ->
[{Ch, Wt}|apply_diffmap_add(Ch, Diff - Wt, T)]
end;
apply_diffmap_add(Ch, Diff, [H|T]) ->
[H|apply_diffmap_add(Ch, Diff, T)];
apply_diffmap_add(_Ch, _Diff, []) ->
[].
combine_neighbors([{Ch, Wt1}, {Ch, Wt2}|T]) ->
combine_neighbors([{Ch, Wt1 + Wt2}|T]);
combine_neighbors([H|T]) ->
[H|combine_neighbors(T)];
combine_neighbors([]) ->
[].
collapse_unused_in_float_map([{Ch, Wt1}, {unused, Wt2}|T]) ->
collapse_unused_in_float_map([{Ch, Wt1 + Wt2}|T]);
collapse_unused_in_float_map([{unused, _}] = L) ->
L; % Degenerate case only
collapse_unused_in_float_map([H|T]) ->
[H|collapse_unused_in_float_map(T)];
collapse_unused_in_float_map([]) ->
[].
chash_float_map_to_nextfloat_list(FloatMap) when length(FloatMap) > 0 ->
%% QuickCheck found a bug ... need to weed out stuff smaller than
%% ?SMALLEST_SIGNIFICANT_FLOAT_SIZE here.
FM1 = [P || {_X, Y} = P <- FloatMap, Y > ?SMALLEST_SIGNIFICANT_FLOAT_SIZE],
{_Sum, NFs0} = lists:foldl(fun({Name, Amount}, {Sum, List}) ->
{Sum+Amount, [{Sum+Amount, Name}|List]}
end, {0, []}, FM1),
lists:reverse(NFs0).
chash_nextfloat_list_to_gb_tree([]) ->
gb_trees:balance(gb_trees:from_orddict([]));
chash_nextfloat_list_to_gb_tree(NextFloatList) ->
{_FloatPos, Name} = lists:last(NextFloatList),
%% QuickCheck found a bug ... it really helps to add a catch-all item
%% at the far "right" of the list ... 42.0 is much greater than 1.0.
NFs = NextFloatList ++ [{42.0, Name}],
gb_trees:balance(gb_trees:from_orddict(orddict:from_list(NFs))).
-spec chash_gb_next(float(), float_tree()) -> {float(), owner_name()}.
chash_gb_next(X, {_, GbTree}) ->
chash_gb_next1(X, GbTree).
chash_gb_next1(X, {Key, Val, Left, _Right}) when X < Key ->
case chash_gb_next1(X, Left) of
nil ->
{Key, Val};
Res ->
Res
end;
chash_gb_next1(X, {Key, _Val, _Left, Right}) when X >= Key ->
chash_gb_next1(X, Right);
chash_gb_next1(_X, nil) ->
nil.
%% @doc Not used directly, but can give a developer an idea of how well
%% chash_float_map_to_nextfloat_list will do for a given value of Max.
%%
%% For example:
%% <verbatim>
%% NewFloatMap = make_float_map([{unused, 1.0}],
%% [{a,100}, {b, 100}, {c, 10}]),
%% ChashMap = chash_scale_to_int_interval(NewFloatMap, 100),
%% io:format("QQQ: int int = ~p\n", [ChashIntInterval]),
%% -> [{a,1,47},{b,48,94},{c,94,100}]
%% </verbatim>
%%
%% Interpretation: out of the 100 slots:
%% <ul>
%% <li> 'a' uses the slots 1-47 </li>
%% <li> 'b' uses the slots 48-94 </li>
%% <li> 'c' uses the slots 95-100 </li>
%% </ul>
chash_scale_to_int_interval(NewFloatMap, Max) ->
chash_scale_to_int_interval(NewFloatMap, 0, Max).
%% @type nextfloat_list() = list({float(), brick()}). A nextfloat_list
%% differs from a float_map in two respects: 1) nextfloat_list contains
%% tuples with the brick name in 2nd position, 2) the float() at each
%% position I_n > I_m, for all n, m such that n > m.
%% For example, a nextfloat_list of the float_map example above,
%% [{0.25, {br1, nd1}}, {0.75, {br2, nd1}}, {1.0, {br3, nd1}].
chash_scale_to_int_interval([{Ch, _Wt}], Cur, Max) ->
[{Ch, Cur, Max}];
chash_scale_to_int_interval([{Ch, Wt}|T], Cur, Max) ->
Int = trunc(Wt * Max),
[{Ch, Cur + 1, Cur + Int}|chash_scale_to_int_interval(T, Cur + Int, Max)].
%%%%%%%%%%%%%
%% @doc Make a pretty/human-friendly version of a float map that describes
%% integer ranges between 1 and `Scale'.
-spec pretty_with_integers(float_map(), integer()) -> [owner_int_range()].
pretty_with_integers(Map, Scale) ->
chash_scale_to_int_interval(Map, Scale).
%% @doc Make a pretty/human-friendly version of a float map (based
%% upon a float map created from `OldWeights' and `NewWeights') that
%% describes integer ranges between 1 and `Scale'.
-spec pretty_with_integers(owner_weight_list(), owner_weight_list(),integer())->
[owner_int_range()].
pretty_with_integers(OldWeights, NewWeights, Scale) ->
chash_scale_to_int_interval(
make_float_map(make_float_map(OldWeights),
NewWeights),
Scale).
%% @doc Create a float tree, which is the rapid lookup data structure
%% for consistent hash queries.
-spec make_tree(float_map()) -> float_tree().
make_tree(Map) ->
chash_nextfloat_list_to_gb_tree(
chash_float_map_to_nextfloat_list(Map)).
%% @doc Low-level function for querying a float tree: the (floating
%% point) point within the unit interval.
-spec query_tree(float(), float_tree()) -> {float(), owner_name()}.
query_tree(Val, Tree) when is_float(Val), 0.0 =< Val, Val =< 1.0 ->
chash_gb_next(Val, Tree).
%% @doc Create a sample float map.
-spec make_demo_map1() -> float_map().
make_demo_map1() ->
{_, Res} = make_demo_map1_i(),
Res.
make_demo_map1_i() ->
Fail1 = {b, 100},
L1 = [{a, 100}, Fail1, {c, 100}],
L2 = L1 ++ [{d, 100}, {e, 100}],
L3 = L2 -- [Fail1],
L4 = L3 ++ [{giant, 300}],
{L4, lists:foldl(fun(New, Old) -> make_float_map(Old, New) end,
make_float_map(L1), [L2, L3, L4])}.
%% @doc Create a sample float map.
-spec make_demo_map2() -> float_map().
make_demo_map2() ->
{L0, _} = make_demo_map1_i(),
L1 = L0 ++ [{h, 100}],
L2 = L1 ++ [{i, 100}],
L3 = L2 ++ [{j, 100}],
lists:foldl(fun(New, Old) -> make_float_map(Old, New) end,
make_demo_map1(), [L1, L2, L3]).
%% @doc Create a human-friendly summary of a float map.
%%
%% The two parts of the summary are: a per-owner total of the unit
%% interval range(s) owned by each owner, and a total sum of all
%% per-owner ranges (which should be 1.0 but is not enforced).
-spec sum_map_weights(float_map()) ->
{{per_owner, float_map()}, {weight_sum, float()}}.
sum_map_weights(Map) ->
L = sum_map_weights(lists:sort(Map), undefined, 0.0) -- [{undefined,0.0}],
WeightSum = lists:sum([Weight || {_, Weight} <- L]),
{{per_owner, L}, {weight_sum, WeightSum}}.
sum_map_weights([{SZ, Weight}|T], SZ, SZ_total) ->
sum_map_weights(T, SZ, SZ_total + Weight);
sum_map_weights([{SZ, Weight}|T], LastSZ, LastSZ_total) ->
[{LastSZ, LastSZ_total}|sum_map_weights(T, SZ, Weight)];
sum_map_weights([], LastSZ, LastSZ_total) ->
[{LastSZ, LastSZ_total}].
%% @doc Query a float map with a binary (inefficient).
-spec hash_binary_via_float_map(binary(), float_map()) ->
{float(), owner_name()}.
hash_binary_via_float_map(Key, Map) ->
Tree = make_tree(Map),
<<Int:(20*8)/unsigned>> = crypto:hash(sha, Key),
Float = Int / ?SHA_MAX,
query_tree(Float, Tree).
%% @doc Query a float tree with a binary.
-spec hash_binary_via_float_tree(binary(), float_tree()) ->
{float(), owner_name()}.
hash_binary_via_float_tree(Key, Tree) ->
<<Int:(20*8)/unsigned>> = crypto:hash(sha, Key),
Float = Int / ?SHA_MAX,
query_tree(Float, Tree).
%%%%% @doc Various usage examples, see source code below this function
%%%%% for full details.
zzz_usage_details() ->
%% %% Make a map. See the code for make_demo_map1() for the order of
%% %% additions & deletions. Here's a brief summary of the 4 steps.
%% %%
%% %% * 'a' through 'e' are weighted @ 100.
%% %% * 'giant' is weighted @ 300.
%% %% * 'b' is removed at step #3.
%% 40> M1 = machi_chash:make_demo_map1().
%% [{a,0.09285714285714286},
%% {giant,0.10714285714285715},
%% {d,0.026190476190476153},
%% {giant,0.10714285714285715},
%% {a,0.04999999999999999},
%% {giant,0.04999999999999999},
%% {d,0.04999999999999999},
%% {giant,0.050000000000000044},
%% {d,0.06666666666666671},
%% {e,0.009523809523809434},
%% {giant,0.05714285714285716},
%% {c,0.14285714285714285},
%% {giant,0.05714285714285716},
%% {e,0.13333333333333341}]
%% %% Map M1 onto the interval of integers 0-10,1000
%% %%
%% %% output = list({SZ_name::term(), Start::integer(), End::integer()})
%% 41> machi_chash:pretty_with_integers(M1, 10*1000).
%% [{a,1,928},
%% {giant,929,1999},
%% {d,2000,2260},
%% {giant,2261,3331},
%% {a,3332,3830},
%% {giant,3831,4329},
%% {d,4330,4828},
%% {giant,4829,5328},
%% {d,5329,5994},
%% {e,5995,6089},
%% {giant,6090,6660},
%% {c,6661,8088},
%% {giant,8089,8659},
%% {e,8659,10000}]
%% %% Sum up all of the weights, make sure it's what we expect:
%% 55> machi_chash:sum_map_weights(M1).
%% {{per_owner,[{a,0.14285714285714285},
%% {c,0.14285714285714285},
%% {d,0.14285714285714285},
%% {e,0.14285714285714285},
%% {giant,0.42857142857142866}]},
%% {weight_sum,1.0}}
%% %% Make a tree, then query it
%% %% (Hash::float(), tree()) -> {NextLargestBoundary::float(), szone()}
%% 58> T1 = machi_chash:make_tree(M1).
%% 59> machi_chash:query_tree(0.2555, T1).
%% {0.3333333333333333,giant}
%% 60> machi_chash:query_tree(0.3555, T1).
%% {0.3833333333333333,a}
%% 61> machi_chash:query_tree(0.4555, T1).
%% {0.4833333333333333,d}
%% %% How about hashing a bunch of strings and see what happens?
%% 74> Key1 = "Hello, world!".
%% "Hello, world!"
%% 75> [{K, element(2, machi_chash:hash_binary_via_float_map(K, M1))} || K <- [lists:sublist(Key1, X) || X <- lists:seq(1, length(Key1))]].
%% [{"H",giant},
%% {"He",giant},
%% {"Hel",giant},
%% {"Hell",e},
%% {"Hello",e},
%% {"Hello,",giant},
%% {"Hello, ",e},
%% {"Hello, w",e},
%% {"Hello, wo",giant},
%% {"Hello, wor",d},
%% {"Hello, worl",giant},
%% {"Hello, world",e},
%% {"Hello, world!",d}]
ok.

589
src/machi_flu1.erl Normal file
View file

@ -0,0 +1,589 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_flu1).
-include_lib("kernel/include/file.hrl").
-include("machi.hrl").
-include("machi_projection.hrl").
-export([start_link/1, stop/1]).
-record(state, {
reg_name :: atom(),
proj_store :: pid(),
append_pid :: pid(),
tcp_port :: non_neg_integer(),
data_dir :: string(),
wedge = true :: 'disabled' | boolean(),
my_epoch_id :: 'undefined',
dbg_props = [] :: list(), % proplist
props = [] :: list() % proplist
}).
start_link([{FluName, TcpPort, DataDir}|Rest])
when is_atom(FluName), is_integer(TcpPort), is_list(DataDir) ->
{ok, spawn_link(fun() -> main2(FluName, TcpPort, DataDir, Rest) end)}.
stop(Pid) ->
case erlang:is_process_alive(Pid) of
true ->
Pid ! forever,
ok;
false ->
error
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%
main2(RegName, TcpPort, DataDir, Rest) ->
S0 = #state{reg_name=RegName,
tcp_port=TcpPort,
data_dir=DataDir,
props=Rest},
AppendPid = start_append_server(S0),
ProjRegName = make_projection_server_regname(RegName),
{ok, ProjectionPid} =
machi_projection_store:start_link(ProjRegName, DataDir, AppendPid),
S1 = S0#state{append_pid=AppendPid,
proj_store=ProjectionPid},
S2 = case proplists:get_value(dbg, Rest) of
undefined ->
S1;
DbgProps ->
S1#state{wedge=disabled,
dbg_props=DbgProps,
props=lists:keydelete(dbg, 1, Rest)}
end,
ListenPid = start_listen_server(S2),
Config_e = machi_util:make_config_filename(DataDir, "unused"),
ok = filelib:ensure_dir(Config_e),
{_, Data_e} = machi_util:make_data_filename(DataDir, "unused"),
ok = filelib:ensure_dir(Data_e),
Projection_e = machi_util:make_projection_filename(DataDir, "unused"),
ok = filelib:ensure_dir(Projection_e),
put(flu_reg_name, RegName),
put(flu_append_pid, AppendPid),
put(flu_projection_pid, ProjectionPid),
put(flu_listen_pid, ListenPid),
receive forever -> ok end.
start_listen_server(S) ->
spawn_link(fun() -> run_listen_server(S) end).
start_append_server(S) ->
spawn_link(fun() -> run_append_server(S) end).
%% start_projection_server(S) ->
%% spawn_link(fun() -> run_projection_server(S) end).
run_listen_server(#state{tcp_port=TcpPort}=S) ->
SockOpts = [{reuseaddr, true},
{mode, binary}, {active, false}, {packet, line}],
{ok, LSock} = gen_tcp:listen(TcpPort, SockOpts),
listen_server_loop(LSock, S).
run_append_server(#state{reg_name=Name}=S) ->
register(Name, self()),
append_server_loop(S).
listen_server_loop(LSock, S) ->
{ok, Sock} = gen_tcp:accept(LSock),
spawn_link(fun() -> net_server_loop(Sock, S) end),
listen_server_loop(LSock, S).
append_server_loop(#state{data_dir=DataDir}=S) ->
receive
{seq_append, From, Prefix, Chunk, CSum} ->
spawn(fun() -> append_server_dispatch(From, Prefix, Chunk, CSum,
DataDir) end),
append_server_loop(S);
{wedge_state_change, Boolean} ->
append_server_loop(S#state{wedge=Boolean})
end.
-define(EpochIDSpace, (4+20)).
net_server_loop(Sock, #state{reg_name=RegName, data_dir=DataDir}=S) ->
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0, 60*1000) of
{ok, Line} ->
%% machi_util:verb("Got: ~p\n", [Line]),
PrefixLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 8 - 1,
FileLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 16 - 8 - 1,
CSumFileLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 1,
WriteFileLenLF = byte_size(Line) - 7 - ?EpochIDSpace - 16 - 8 - 1,
DelFileLenLF = byte_size(Line) - 14 - ?EpochIDSpace - 1,
case Line of
%% For normal use
<<"A ",
_EpochIDRaw:(?EpochIDSpace)/binary,
LenHex:8/binary,
Prefix:PrefixLenLF/binary, "\n">> ->
do_net_server_append(RegName, Sock, LenHex, Prefix);
<<"R ",
_EpochIDRaw:(?EpochIDSpace)/binary,
OffsetHex:16/binary, LenHex:8/binary,
File:FileLenLF/binary, "\n">> ->
do_net_server_read(Sock, OffsetHex, LenHex, File, DataDir);
<<"L ", _EpochIDRaw:(?EpochIDSpace)/binary, "\n">> ->
do_net_server_listing(Sock, DataDir);
<<"C ",
_EpochIDRaw:(?EpochIDSpace)/binary,
File:CSumFileLenLF/binary, "\n">> ->
do_net_server_checksum_listing(Sock, File, DataDir);
<<"QUIT\n">> ->
catch gen_tcp:close(Sock),
exit(normal);
<<"QUIT\r\n">> ->
catch gen_tcp:close(Sock),
exit(normal);
%% For "internal" replication only.
<<"W-repl ",
_EpochIDRaw:(?EpochIDSpace)/binary,
OffsetHex:16/binary, LenHex:8/binary,
File:WriteFileLenLF/binary, "\n">> ->
do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir);
%% For data migration only.
<<"DEL-migration ",
_EpochIDRaw:(?EpochIDSpace)/binary,
File:DelFileLenLF/binary, "\n">> ->
do_net_server_delete_migration_only(Sock, File, DataDir);
%% For erasure coding hackityhack
<<"TRUNC-hack--- ",
_EpochIDRaw:(?EpochIDSpace)/binary,
File:DelFileLenLF/binary, "\n">> ->
do_net_server_truncate_hackityhack(Sock, File, DataDir);
<<"PROJ ", LenHex:8/binary, "\n">> ->
do_projection_command(Sock, LenHex, S);
_ ->
machi_util:verb("Else Got: ~p\n", [Line]),
gen_tcp:send(Sock, "ERROR SYNTAX\n"),
catch gen_tcp:close(Sock),
exit(normal)
end,
net_server_loop(Sock, S);
_ ->
catch gen_tcp:close(Sock),
exit(normal)
end.
append_server_dispatch(From, Prefix, Chunk, CSum, DataDir) ->
Pid = write_server_get_pid(Prefix, DataDir),
Pid ! {seq_append, From, Prefix, Chunk, CSum},
exit(normal).
do_net_server_append(RegName, Sock, LenHex, Prefix) ->
%% TODO: robustify against other invalid path characters such as NUL
case sanitize_file_string(Prefix) of
ok ->
do_net_server_append2(RegName, Sock, LenHex, Prefix);
_ ->
ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG">>)
end.
sanitize_file_string(Str) ->
case re:run(Str, "/") of
nomatch ->
ok;
_ ->
error
end.
do_net_server_append2(RegName, Sock, LenHex, Prefix) ->
<<Len:32/big>> = machi_util:hexstr_to_bin(LenHex),
ok = inet:setopts(Sock, [{packet, raw}]),
{ok, Chunk} = gen_tcp:recv(Sock, Len, 60*1000),
CSum = machi_util:checksum(Chunk),
try
RegName ! {seq_append, self(), Prefix, Chunk, CSum}
catch error:badarg ->
error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE])
end,
receive
{assignment, Offset, File} ->
OffsetHex = machi_util:bin_to_hexstr(<<Offset:64/big>>),
Out = io_lib:format("OK ~s ~s\n", [OffsetHex, File]),
ok = gen_tcp:send(Sock, Out)
after 10*1000 ->
ok = gen_tcp:send(Sock, "TIMEOUT\n")
end.
do_net_server_read(Sock, OffsetHex, LenHex, FileBin, DataDir) ->
DoItFun = fun(FH, Offset, Len) ->
case file:pread(FH, Offset, Len) of
{ok, Bytes} when byte_size(Bytes) == Len ->
gen_tcp:send(Sock, ["OK\n", Bytes]);
{ok, Bytes} ->
machi_util:verb("ok read but wanted ~p got ~p: ~p @ offset ~p\n",
[Len, size(Bytes), FileBin, Offset]),
ok = gen_tcp:send(Sock, "ERROR PARTIAL-READ\n");
eof ->
perhaps_do_net_server_ec_read(Sock, FH);
_Else2 ->
machi_util:verb("Else2 ~p ~p ~P\n",
[Offset, Len, _Else2, 20]),
ok = gen_tcp:send(Sock, "ERROR BAD-READ\n")
end
end,
do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
[read, binary, raw], DoItFun).
do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
FileOpts, DoItFun) ->
case sanitize_file_string(FileBin) of
ok ->
do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin,
DataDir, FileOpts, DoItFun);
_ ->
ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>)
end.
do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir,
FileOpts, DoItFun) ->
<<Offset:64/big>> = machi_util:hexstr_to_bin(OffsetHex),
<<Len:32/big>> = machi_util:hexstr_to_bin(LenHex),
{_, Path} = machi_util:make_data_filename(DataDir, FileBin),
OptsHasWrite = lists:member(write, FileOpts),
case file:open(Path, FileOpts) of
{ok, FH} ->
try
DoItFun(FH, Offset, Len)
after
file:close(FH)
end;
{error, enoent} when OptsHasWrite ->
do_net_server_readwrite_common(
Sock, OffsetHex, LenHex, FileBin, DataDir,
FileOpts, DoItFun);
_Else ->
%%%%%% keep?? machi_util:verb("Else ~p ~p ~p ~p\n", [Offset, Len, Path, _Else]),
ok = gen_tcp:send(Sock, <<"ERROR BAD-IO\n">>)
end.
do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir) ->
CSumPath = machi_util:make_checksum_filename(DataDir, FileBin),
case file:open(CSumPath, [append, raw, binary, delayed_write]) of
{ok, FHc} ->
do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc);
{error, enoent} ->
ok = filelib:ensure_dir(CSumPath),
do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir)
end.
do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc) ->
DoItFun = fun(FHd, Offset, Len) ->
ok = inet:setopts(Sock, [{packet, raw}]),
{ok, Chunk} = gen_tcp:recv(Sock, Len),
CSum = machi_util:checksum(Chunk),
case file:pwrite(FHd, Offset, Chunk) of
ok ->
CSumHex = machi_util:bin_to_hexstr(CSum),
CSum_info = [OffsetHex, 32, LenHex, 32, CSumHex, 10],
ok = file:write(FHc, CSum_info),
ok = file:close(FHc),
gen_tcp:send(Sock, <<"OK\n">>);
_Else3 ->
machi_util:verb("Else3 ~p ~p ~p\n",
[Offset, Len, _Else3]),
ok = gen_tcp:send(Sock, "ERROR BAD-PWRITE\n")
end
end,
do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
[write, read, binary, raw], DoItFun).
perhaps_do_net_server_ec_read(Sock, FH) ->
case file:pread(FH, 0, ?MINIMUM_OFFSET) of
{ok, Bin} when byte_size(Bin) == ?MINIMUM_OFFSET ->
decode_and_reply_net_server_ec_read(Sock, Bin);
{ok, _AnythingElse} ->
ok = gen_tcp:send(Sock, "ERROR PARTIAL-READ2\n");
_AnythingElse ->
ok = gen_tcp:send(Sock, "ERROR BAD-PREAD\n")
end.
decode_and_reply_net_server_ec_read(Sock, <<"a ", Rest/binary>>) ->
decode_and_reply_net_server_ec_read_version_a(Sock, Rest);
decode_and_reply_net_server_ec_read(Sock, <<0:8, _/binary>>) ->
ok = gen_tcp:send(Sock, <<"ERROR NOT-ERASURE\n">>).
decode_and_reply_net_server_ec_read_version_a(Sock, Rest) ->
%% <<BodyLenHex:4/binary, " ", StripeWidthHex:16/binary, " ",
%% OrigFileLenHex:16/binary, " ", _/binary>> = Rest,
HdrLen = 80 - 2 - 4 - 1,
<<BodyLenHex:4/binary, " ", Hdr:HdrLen/binary, Rest2/binary>> = Rest,
<<BodyLen:16/big>> = machi_util:hexstr_to_bin(BodyLenHex),
<<Body:BodyLen/binary, _/binary>> = Rest2,
ok = gen_tcp:send(Sock, ["ERASURE ", BodyLenHex, " ", Hdr, Body]).
do_net_server_listing(Sock, DataDir) ->
{_, WildPath} = machi_util:make_data_filename(DataDir, ""),
Files = filelib:wildcard("*", WildPath),
Out = ["OK\n",
[begin
{ok, FI} = file:read_file_info(WildPath ++ "/" ++ File),
Size = FI#file_info.size,
SizeBin = <<Size:64/big>>,
[machi_util:bin_to_hexstr(SizeBin), <<" ">>,
list_to_binary(File), <<"\n">>]
end || File <- Files],
".\n"
],
ok = gen_tcp:send(Sock, Out).
do_net_server_checksum_listing(Sock, File, DataDir) ->
case sanitize_file_string(File) of
ok ->
do_net_server_checksum_listing2(Sock, File, DataDir);
_ ->
ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>)
end.
do_net_server_checksum_listing2(Sock, File, DataDir) ->
ok = sync_checksum_file(File),
CSumPath = machi_util:make_checksum_filename(DataDir, File),
case file:open(CSumPath, [read, raw, binary]) of
{ok, FH} ->
{ok, FI} = file:read_file_info(CSumPath),
Len = FI#file_info.size,
LenHex = list_to_binary(machi_util:bin_to_hexstr(<<Len:64/big>>)),
%% Client has option of line-by-line with "." terminator,
%% or using the offset in the OK message to slurp things
%% down by exact byte size.
ok = gen_tcp:send(Sock, [<<"OK ">>, LenHex, <<"\n">>]),
do_net_copy_bytes(FH, Sock),
ok = file:close(FH),
ok = gen_tcp:send(Sock, ".\n");
{error, enoent} ->
ok = gen_tcp:send(Sock, "ERROR NO-SUCH-FILE\n");
_ ->
ok = gen_tcp:send(Sock, "ERROR\n")
end.
sync_checksum_file(File) ->
Prefix = re:replace(File, "\\..*", "", [{return, binary}]),
case write_server_find_pid(Prefix) of
undefined ->
ok;
Pid ->
Ref = make_ref(),
Pid ! {sync_stuff, self(), Ref},
receive
{sync_finished, Ref} ->
ok
after 5000 ->
case write_server_find_pid(Prefix) of
undefined ->
ok;
Pid2 when Pid2 /= Pid ->
ok;
_Pid2 ->
error
end
end
end.
do_net_copy_bytes(FH, Sock) ->
case file:read(FH, 1024*1024) of
{ok, Bin} ->
ok = gen_tcp:send(Sock, Bin),
do_net_copy_bytes(FH, Sock);
eof ->
ok
end.
do_net_server_delete_migration_only(Sock, File, DataDir) ->
case sanitize_file_string(File) of
ok ->
do_net_server_delete_migration_only2(Sock, File, DataDir);
_ ->
ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>)
end.
do_net_server_delete_migration_only2(Sock, File, DataDir) ->
{_, Path} = machi_util:make_data_filename(DataDir, File),
case file:delete(Path) of
ok ->
ok = gen_tcp:send(Sock, "OK\n");
{error, enoent} ->
ok = gen_tcp:send(Sock, "ERROR NO-SUCH-FILE\n");
_ ->
ok = gen_tcp:send(Sock, "ERROR\n")
end.
do_net_server_truncate_hackityhack(Sock, File, DataDir) ->
case sanitize_file_string(File) of
ok ->
do_net_server_truncate_hackityhack2(Sock, File, DataDir);
_ ->
ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>)
end.
do_net_server_truncate_hackityhack2(Sock, File, DataDir) ->
{_, Path} = machi_util:make_data_filename(DataDir, File),
case file:open(Path, [read, write, binary, raw]) of
{ok, FH} ->
try
{ok, ?MINIMUM_OFFSET} = file:position(FH, ?MINIMUM_OFFSET),
ok = file:truncate(FH),
ok = gen_tcp:send(Sock, "OK\n")
after
file:close(FH)
end;
{error, enoent} ->
ok = gen_tcp:send(Sock, "ERROR NO-SUCH-FILE\n");
_ ->
ok = gen_tcp:send(Sock, "ERROR\n")
end.
write_server_get_pid(Prefix, DataDir) ->
case write_server_find_pid(Prefix) of
undefined ->
start_seq_append_server(Prefix, DataDir),
timer:sleep(1),
write_server_get_pid(Prefix, DataDir);
Pid ->
Pid
end.
write_server_find_pid(Prefix) ->
RegName = machi_util:make_regname(Prefix),
whereis(RegName).
start_seq_append_server(Prefix, DataDir) ->
spawn_link(fun() -> run_seq_append_server(Prefix, DataDir) end).
run_seq_append_server(Prefix, DataDir) ->
true = register(machi_util:make_regname(Prefix), self()),
run_seq_append_server2(Prefix, DataDir).
run_seq_append_server2(Prefix, DataDir) ->
FileNum = machi_util:read_max_filenum(DataDir, Prefix) + 1,
case machi_util:increment_max_filenum(DataDir, Prefix) of
ok ->
machi_util:increment_max_filenum(DataDir, Prefix),
machi_util:info_msg("start: ~p server at file ~w\n",
[Prefix, FileNum]),
seq_append_server_loop(DataDir, Prefix, FileNum);
Else ->
error_logger:error_msg("start: ~p server at file ~w: ~p\n",
[Prefix, FileNum, Else]),
exit(Else)
end.
seq_append_server_loop(DataDir, Prefix, FileNum) ->
SequencerNameHack = lists:flatten(io_lib:format(
"~.36B~.36B",
[element(3,now()),
list_to_integer(os:getpid())])),
{File, FullPath} = machi_util:make_data_filename(
DataDir, Prefix, SequencerNameHack, FileNum),
{ok, FHd} = file:open(FullPath,
[write, binary, raw]),
%% [write, binary, raw, delayed_write]),
CSumPath = machi_util:make_checksum_filename(
DataDir, Prefix, SequencerNameHack, FileNum),
{ok, FHc} = file:open(CSumPath, [append, raw, binary, delayed_write]),
seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}, FileNum,
?MINIMUM_OFFSET).
seq_append_server_loop(DataDir, Prefix, _File, {FHd,FHc}, FileNum, Offset)
when Offset > ?MAX_FILE_SIZE ->
ok = file:close(FHd),
ok = file:close(FHc),
machi_util:info_msg("rollover: ~p server at file ~w offset ~w\n",
[Prefix, FileNum, Offset]),
run_seq_append_server2(Prefix, DataDir);
seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) ->
receive
{seq_append, From, Prefix, Chunk, CSum} ->
ok = file:pwrite(FHd, Offset, Chunk),
From ! {assignment, Offset, File},
Len = byte_size(Chunk),
OffsetHex = machi_util:bin_to_hexstr(<<Offset:64/big>>),
LenHex = machi_util:bin_to_hexstr(<<Len:32/big>>),
CSumHex = machi_util:bin_to_hexstr(CSum),
CSum_info = [OffsetHex, 32, LenHex, 32, CSumHex, 10],
ok = file:write(FHc, CSum_info),
seq_append_server_loop(DataDir, Prefix, File, FH_,
FileNum, Offset + Len);
{sync_stuff, FromPid, Ref} ->
file:sync(FHc),
FromPid ! {sync_finished, Ref},
seq_append_server_loop(DataDir, Prefix, File, FH_,
FileNum, Offset)
after 30*1000 ->
ok = file:close(FHd),
ok = file:close(FHc),
machi_util:info_msg("stop: ~p server at file ~w offset ~w\n",
[Prefix, FileNum, Offset]),
exit(normal)
end.
do_projection_command(Sock, LenHex, S) ->
try
Len = machi_util:hexstr_to_int(LenHex),
ok = inet:setopts(Sock, [{packet, raw}]),
{ok, ProjCmdBin} = gen_tcp:recv(Sock, Len),
ok = inet:setopts(Sock, [{packet, line}]),
ProjCmd = binary_to_term(ProjCmdBin),
put(hack, ProjCmd),
Res = handle_projection_command(ProjCmd, S),
ResBin = term_to_binary(Res),
ResLenHex = machi_util:int_to_hexbin(byte_size(ResBin), 32),
ok = gen_tcp:send(Sock, [<<"OK ">>, ResLenHex, <<"\n">>, ResBin])
catch
What:Why ->
io:format(user, "OOPS ~p\n", [get(hack)]),
io:format(user, "OOPS ~p ~p ~p\n", [What, Why, erlang:get_stacktrace()]),
WHA = list_to_binary(io_lib:format("TODO-YOLO.~w:~w-~w",
[What, Why, erlang:get_stacktrace()])),
_ = (catch gen_tcp:send(Sock, [<<"ERROR ">>, WHA, <<"\n">>]))
end.
handle_projection_command({get_latest_epoch, ProjType},
#state{proj_store=ProjStore}) ->
machi_projection_store:get_latest_epoch(ProjStore, ProjType);
handle_projection_command({read_latest_projection, ProjType},
#state{proj_store=ProjStore}) ->
machi_projection_store:read_latest_projection(ProjStore, ProjType);
handle_projection_command({read_projection, ProjType, Epoch},
#state{proj_store=ProjStore}) ->
machi_projection_store:read(ProjStore, ProjType, Epoch);
handle_projection_command({write_projection, ProjType, Proj},
#state{proj_store=ProjStore}) ->
machi_projection_store:write(ProjStore, ProjType, Proj);
handle_projection_command({get_all, ProjType},
#state{proj_store=ProjStore}) ->
machi_projection_store:get_all(ProjStore, ProjType);
handle_projection_command({list_all, ProjType},
#state{proj_store=ProjStore}) ->
machi_projection_store:list_all(ProjStore, ProjType);
handle_projection_command(Else, _S) ->
{error, unknown_cmd, Else}.
make_projection_server_regname(BaseName) ->
list_to_atom(atom_to_list(BaseName) ++ "_projection").

647
src/machi_flu1_client.erl Normal file
View file

@ -0,0 +1,647 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_flu1_client).
-include("machi.hrl").
-include("machi_projection.hrl").
-export([
%% File API
append_chunk/4, append_chunk/5,
read_chunk/5, read_chunk/6,
checksum_list/3, checksum_list/4,
list_files/2, list_files/3,
%% Projection API
get_latest_epoch/2, get_latest_epoch/3,
read_latest_projection/2, read_latest_projection/3,
read_projection/3, read_projection/4,
write_projection/3, write_projection/4,
get_all/2, get_all/3,
list_all/2, list_all/3,
%% Common API
quit/1
]).
%% For "internal" replication only.
-export([
write_chunk/5, write_chunk/6,
delete_migration/3, delete_migration/4,
trunc_hack/3, trunc_hack/4
]).
-type chunk() :: binary() | iolist(). % client can use either
-type chunk_csum() :: {file_offset(), chunk_size(), binary()}.
-type chunk_s() :: binary(). % server always uses binary()
-type chunk_pos() :: {file_offset(), chunk_size(), file_name_s()}.
-type chunk_size() :: non_neg_integer().
-type epoch_csum() :: binary().
-type epoch_num() :: non_neg_integer().
-type epoch_id() :: {epoch_num(), epoch_csum()}.
-type file_info() :: {file_size(), file_name_s()}.
-type file_name() :: binary() | list().
-type file_name_s() :: binary(). % server reply
-type file_offset() :: non_neg_integer().
-type file_size() :: non_neg_integer().
-type file_prefix() :: binary() | list().
-type inet_host() :: inet:ip_address() | inet:hostname().
-type inet_port() :: inet:port_number().
-type projection() :: #projection_v1{}.
-type projection_type() :: 'public' | 'private'.
-export_type([epoch_id/0]).
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'.
-spec append_chunk(port(), epoch_id(), file_prefix(), chunk()) ->
{ok, chunk_pos()} | {error, term()}.
append_chunk(Sock, EpochID, Prefix, Chunk) ->
append_chunk2(Sock, EpochID, Prefix, Chunk).
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'.
-spec append_chunk(inet_host(), inet_port(),
epoch_id(), file_prefix(), chunk()) ->
{ok, chunk_pos()} | {error, term()}.
append_chunk(Host, TcpPort, EpochID, Prefix, Chunk) ->
Sock = machi_util:connect(Host, TcpPort),
try
append_chunk2(Sock, EpochID, Prefix, Chunk)
after
catch gen_tcp:close(Sock)
end.
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
-spec read_chunk(port(), epoch_id(), file_name(), file_offset(), chunk_size()) ->
{ok, chunk_s()} | {error, term()}.
read_chunk(Sock, EpochID, File, Offset, Size)
when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
read_chunk2(Sock, EpochID, File, Offset, Size).
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
-spec read_chunk(inet_host(), inet_port(), epoch_id(),
file_name(), file_offset(), chunk_size()) ->
{ok, chunk_s()} | {error, term()}.
read_chunk(Host, TcpPort, EpochID, File, Offset, Size)
when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
Sock = machi_util:connect(Host, TcpPort),
try
read_chunk2(Sock, EpochID, File, Offset, Size)
after
catch gen_tcp:close(Sock)
end.
%% @doc Fetch the list of chunk checksums for `File'.
-spec checksum_list(port(), epoch_id(), file_name()) ->
{ok, [chunk_csum()]} | {error, term()}.
checksum_list(Sock, EpochID, File) when is_port(Sock) ->
checksum_list2(Sock, EpochID, File).
%% @doc Fetch the list of chunk checksums for `File'.
-spec checksum_list(inet_host(), inet_port(), epoch_id(), file_name()) ->
{ok, [chunk_csum()]} | {error, term()}.
checksum_list(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
Sock = machi_util:connect(Host, TcpPort),
try
checksum_list2(Sock, EpochID, File)
after
catch gen_tcp:close(Sock)
end.
%% @doc Fetch the list of all files on the remote FLU.
-spec list_files(port(), epoch_id()) ->
{ok, [file_info()]} | {error, term()}.
list_files(Sock, EpochID) when is_port(Sock) ->
list2(Sock, EpochID).
%% @doc Fetch the list of all files on the remote FLU.
-spec list_files(inet_host(), inet_port(), epoch_id()) ->
{ok, [file_info()]} | {error, term()}.
list_files(Host, TcpPort, EpochID) when is_integer(TcpPort) ->
Sock = machi_util:connect(Host, TcpPort),
try
list2(Sock, EpochID)
after
catch gen_tcp:close(Sock)
end.
%% @doc Get the latest epoch number from the FLU's projection store.
-spec get_latest_epoch(port(), projection_type()) ->
{ok, -1|non_neg_integer()} | {error, term()}.
get_latest_epoch(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
get_latest_epoch2(Sock, ProjType).
%% @doc Get the latest epoch number from the FLU's projection store.
-spec get_latest_epoch(inet_host(), inet_port(),
projection_type()) ->
{ok, -1|non_neg_integer()} | {error, term()}.
get_latest_epoch(Host, TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
Sock = machi_util:connect(Host, TcpPort),
try
get_latest_epoch2(Sock, ProjType)
after
catch gen_tcp:close(Sock)
end.
%% @doc Get the latest epoch number from the FLU's projection store.
-spec read_latest_projection(port(), projection_type()) ->
{ok, projection()} | {error, not_written} | {error, term()}.
read_latest_projection(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
read_latest_projection2(Sock, ProjType).
%% @doc Get the latest epoch number from the FLU's projection store.
-spec read_latest_projection(inet_host(), inet_port(),
projection_type()) ->
{ok, projection()} | {error, not_written} | {error, term()}.
read_latest_projection(Host, TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
Sock = machi_util:connect(Host, TcpPort),
try
read_latest_projection2(Sock, ProjType)
after
catch gen_tcp:close(Sock)
end.
%% @doc Read a projection `Proj' of type `ProjType'.
-spec read_projection(port(), projection_type(), epoch_num()) ->
{ok, projection()} | {error, written} | {error, term()}.
read_projection(Sock, ProjType, Epoch)
when ProjType == 'public' orelse ProjType == 'private' ->
read_projection2(Sock, ProjType, Epoch).
%% @doc Read a projection `Proj' of type `ProjType'.
-spec read_projection(inet_host(), inet_port(),
projection_type(), epoch_num()) ->
{ok, projection()} | {error, written} | {error, term()}.
read_projection(Host, TcpPort, ProjType, Epoch)
when ProjType == 'public' orelse ProjType == 'private' ->
Sock = machi_util:connect(Host, TcpPort),
try
read_projection2(Sock, ProjType, Epoch)
after
catch gen_tcp:close(Sock)
end.
%% @doc Write a projection `Proj' of type `ProjType'.
-spec write_projection(port(), projection_type(), projection()) ->
'ok' | {error, written} | {error, term()}.
write_projection(Sock, ProjType, Proj)
when ProjType == 'public' orelse ProjType == 'private',
is_record(Proj, projection_v1) ->
write_projection2(Sock, ProjType, Proj).
%% @doc Write a projection `Proj' of type `ProjType'.
-spec write_projection(inet_host(), inet_port(),
projection_type(), projection()) ->
'ok' | {error, written} | {error, term()}.
write_projection(Host, TcpPort, ProjType, Proj)
when ProjType == 'public' orelse ProjType == 'private',
is_record(Proj, projection_v1) ->
Sock = machi_util:connect(Host, TcpPort),
try
write_projection2(Sock, ProjType, Proj)
after
catch gen_tcp:close(Sock)
end.
%% @doc Get all projections from the FLU's projection store.
-spec get_all(port(), projection_type()) ->
{ok, [projection()]} | {error, term()}.
get_all(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
get_all2(Sock, ProjType).
%% @doc Get all projections from the FLU's projection store.
-spec get_all(inet_host(), inet_port(),
projection_type()) ->
{ok, [projection()]} | {error, term()}.
get_all(Host, TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
Sock = machi_util:connect(Host, TcpPort),
try
get_all2(Sock, ProjType)
after
catch gen_tcp:close(Sock)
end.
%% @doc Get all epoch numbers from the FLU's projection store.
-spec list_all(port(), projection_type()) ->
{ok, [non_neg_integer()]} | {error, term()}.
list_all(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
list_all2(Sock, ProjType).
%% @doc Get all epoch numbers from the FLU's projection store.
-spec list_all(inet_host(), inet_port(),
projection_type()) ->
{ok, [non_neg_integer()]} | {error, term()}.
list_all(Host, TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
Sock = machi_util:connect(Host, TcpPort),
try
list_all2(Sock, ProjType)
after
catch gen_tcp:close(Sock)
end.
%% @doc Quit &amp; close the connection to remote FLU.
-spec quit(port()) ->
ok.
quit(Sock) when is_port(Sock) ->
catch (_ = gen_tcp:send(Sock, <<"QUIT\n">>)),
catch gen_tcp:close(Sock),
ok.
%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% @doc Restricted API: Write a chunk of already-sequenced data to
%% `File' at `Offset'.
-spec write_chunk(port(), epoch_id(), file_name(), file_offset(), chunk()) ->
ok | {error, term()}.
write_chunk(Sock, EpochID, File, Offset, Chunk)
when Offset >= ?MINIMUM_OFFSET ->
write_chunk2(Sock, EpochID, File, Offset, Chunk).
%% @doc Restricted API: Write a chunk of already-sequenced data to
%% `File' at `Offset'.
-spec write_chunk(inet_host(), inet_port(),
epoch_id(), file_name(), file_offset(), chunk()) ->
ok | {error, term()}.
write_chunk(Host, TcpPort, EpochID, File, Offset, Chunk)
when Offset >= ?MINIMUM_OFFSET ->
Sock = machi_util:connect(Host, TcpPort),
try
write_chunk2(Sock, EpochID, File, Offset, Chunk)
after
catch gen_tcp:close(Sock)
end.
%% @doc Restricted API: Delete a file after it has been successfully
%% migrated.
-spec delete_migration(port(), epoch_id(), file_name()) ->
ok | {error, term()}.
delete_migration(Sock, EpochID, File) when is_port(Sock) ->
delete_migration2(Sock, EpochID, File).
%% @doc Restricted API: Delete a file after it has been successfully
%% migrated.
-spec delete_migration(inet_host(), inet_port(), epoch_id(), file_name()) ->
ok | {error, term()}.
delete_migration(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
Sock = machi_util:connect(Host, TcpPort),
try
delete_migration2(Sock, EpochID, File)
after
catch gen_tcp:close(Sock)
end.
%% @doc Restricted API: Truncate a file after it has been successfully
%% erasure coded.
-spec trunc_hack(port(), epoch_id(), file_name()) ->
ok | {error, term()}.
trunc_hack(Sock, EpochID, File) when is_port(Sock) ->
trunc_hack2(Sock, EpochID, File).
%% @doc Restricted API: Truncate a file after it has been successfully
%% erasure coded.
-spec trunc_hack(inet_host(), inet_port(), epoch_id(), file_name()) ->
ok | {error, term()}.
trunc_hack(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
Sock = machi_util:connect(Host, TcpPort),
try
trunc_hack2(Sock, EpochID, File)
after
catch gen_tcp:close(Sock)
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%
append_chunk2(Sock, EpochID, Prefix0, Chunk0) ->
try
%% TODO: add client-side checksum to the server's protocol
%% _ = crypto:hash(md5, Chunk),
Prefix = machi_util:make_binary(Prefix0),
Chunk = machi_util:make_binary(Chunk0),
Len = iolist_size(Chunk0),
true = (Len =< ?MAX_CHUNK_SIZE),
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
LenHex = machi_util:int_to_hexbin(Len, 32),
Cmd = [<<"A ">>, EpochIDRaw, LenHex, Prefix, 10],
ok = gen_tcp:send(Sock, [Cmd, Chunk]),
{ok, Line} = gen_tcp:recv(Sock, 0),
PathLen = byte_size(Line) - 3 - 16 - 1 - 1,
case Line of
<<"OK ", OffsetHex:16/binary, " ",
Path:PathLen/binary, _:1/binary>> ->
Offset = machi_util:hexstr_to_int(OffsetHex),
{ok, {Offset, Len, Path}};
<<"ERROR BAD-ARG", _/binary>> ->
{error, bad_arg};
<<"ERROR ", Rest/binary>> ->
{error, Rest}
end
catch
throw:Error ->
Error;
error:{badmatch,_}=BadMatch ->
{error, {badmatch, BadMatch, erlang:get_stacktrace()}}
end.
read_chunk2(Sock, EpochID, File0, Offset, Size) ->
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
File = machi_util:make_binary(File0),
PrefixHex = machi_util:int_to_hexbin(Offset, 64),
SizeHex = machi_util:int_to_hexbin(Size, 32),
CmdLF = [$R, 32, EpochIDRaw, PrefixHex, SizeHex, File, 10],
ok = gen_tcp:send(Sock, CmdLF),
case gen_tcp:recv(Sock, 3) of
{ok, <<"OK\n">>} ->
{ok, _Chunk}=Res = gen_tcp:recv(Sock, Size),
Res;
{ok, Else} ->
{ok, OldOpts} = inet:getopts(Sock, [packet]),
ok = inet:setopts(Sock, [{packet, line}]),
{ok, Else2} = gen_tcp:recv(Sock, 0),
ok = inet:setopts(Sock, OldOpts),
case Else of
<<"ERA">> ->
{error, todo_erasure_coded}; %% escript_cc_parse_ec_info(Sock, Line, Else2);
<<"ERR">> ->
case Else2 of
<<"OR BAD-IO\n">> ->
{error, no_such_file};
<<"OR NOT-ERASURE\n">> ->
{error, no_such_file};
<<"OR BAD-ARG\n">> ->
{error, bad_arg};
<<"OR PARTIAL-READ\n">> ->
{error, partial_read};
_ ->
{error, Else2}
end;
_ ->
{error, {whaaa, <<Else/binary, Else2/binary>>}}
end
end.
list2(Sock, EpochID) ->
try
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
ok = gen_tcp:send(Sock, [<<"L ">>, EpochIDRaw, <<"\n">>]),
ok = inet:setopts(Sock, [{packet, line}]),
{ok, <<"OK\n">>} = gen_tcp:recv(Sock, 0),
Res = list3(gen_tcp:recv(Sock, 0), Sock),
ok = inet:setopts(Sock, [{packet, raw}]),
{ok, Res}
catch
throw:Error ->
Error;
error:{badmatch,_}=BadMatch ->
{error, {badmatch, BadMatch}}
end.
list3({ok, <<".\n">>}, _Sock) ->
[];
list3({ok, Line}, Sock) ->
FileLen = byte_size(Line) - 16 - 1 - 1,
<<SizeHex:16/binary, " ", File:FileLen/binary, _/binary>> = Line,
Size = machi_util:hexstr_to_int(SizeHex),
[{Size, File}|list3(gen_tcp:recv(Sock, 0), Sock)];
list3(Else, _Sock) ->
throw({server_protocol_error, Else}).
checksum_list2(Sock, EpochID, File) ->
try
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
ok = gen_tcp:send(Sock, [<<"C ">>, EpochIDRaw, File, <<"\n">>]),
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0) of
{ok, <<"OK ", Rest/binary>> = Line} ->
put(status, ok), % may be unset later
RestLen = byte_size(Rest) - 1,
<<LenHex:RestLen/binary, _:1/binary>> = Rest,
<<Len:64/big>> = machi_util:hexstr_to_bin(LenHex),
ok = inet:setopts(Sock, [{packet, raw}]),
{ok, checksum_list_finish(checksum_list_fast(Sock, Len))};
{ok, <<"ERROR NO-SUCH-FILE", _/binary>>} ->
{error, no_such_file};
{ok, <<"ERROR BAD-ARG", _/binary>>} ->
{error, bad_arg};
{ok, Else} ->
throw({server_protocol_error, Else})
end
catch
throw:Error ->
Error;
error:{badmatch,_}=BadMatch ->
{error, {badmatch, BadMatch}}
end.
checksum_list_fast(Sock, 0) ->
{ok, <<".\n">> = _Line} = gen_tcp:recv(Sock, 2),
[];
checksum_list_fast(Sock, Remaining) ->
Num = erlang:min(Remaining, 1024*1024),
{ok, Bytes} = gen_tcp:recv(Sock, Num),
[Bytes|checksum_list_fast(Sock, Remaining - byte_size(Bytes))].
checksum_list_finish(Chunks) ->
Bin = case Chunks of
[X] ->
X;
_ ->
iolist_to_binary(Chunks)
end,
[begin
CSumLen = byte_size(Line) - 16 - 1 - 8 - 1,
<<OffsetHex:16/binary, " ", SizeHex:8/binary, " ",
CSum:CSumLen/binary>> = Line,
{machi_util:hexstr_to_int(OffsetHex),
machi_util:hexstr_to_int(SizeHex),
machi_util:hexstr_to_bin(CSum)}
end || Line <- re:split(Bin, "\n", [{return, binary}]),
Line /= <<>>].
write_chunk2(Sock, EpochID, File0, Offset, Chunk0) ->
try
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
%% TODO: add client-side checksum to the server's protocol
%% _ = crypto:hash(md5, Chunk),
File = machi_util:make_binary(File0),
true = (Offset >= ?MINIMUM_OFFSET),
OffsetHex = machi_util:int_to_hexbin(Offset, 64),
Chunk = machi_util:make_binary(Chunk0),
Len = iolist_size(Chunk0),
true = (Len =< ?MAX_CHUNK_SIZE),
LenHex = machi_util:int_to_hexbin(Len, 32),
Cmd = [<<"W-repl ">>, EpochIDRaw, OffsetHex,
LenHex, File, <<"\n">>],
ok = gen_tcp:send(Sock, [Cmd, Chunk]),
{ok, Line} = gen_tcp:recv(Sock, 0),
PathLen = byte_size(Line) - 3 - 16 - 1 - 1,
case Line of
<<"OK\n">> ->
ok;
<<"ERROR BAD-ARG", _/binary>> ->
{error, bad_arg};
<<"ERROR ", _/binary>>=Else ->
{error, {server_said, Else}}
end
catch
throw:Error ->
Error;
error:{badmatch,_}=BadMatch ->
{error, {badmatch, BadMatch, erlang:get_stacktrace()}}
end.
delete_migration2(Sock, EpochID, File) ->
try
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
Cmd = [<<"DEL-migration ">>, EpochIDRaw, File, <<"\n">>],
ok = gen_tcp:send(Sock, Cmd),
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0) of
{ok, <<"OK\n">>} ->
ok;
{ok, <<"ERROR NO-SUCH-FILE", _/binary>>} ->
{error, no_such_file};
{ok, <<"ERROR BAD-ARG", _/binary>>} ->
{error, bad_arg};
{ok, Else} ->
throw({server_protocol_error, Else})
end
catch
throw:Error ->
Error;
error:{badmatch,_}=BadMatch ->
{error, {badmatch, BadMatch}}
end.
trunc_hack2(Sock, EpochID, File) ->
try
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
Cmd = [<<"TRUNC-hack--- ">>, EpochIDRaw, File, <<"\n">>],
ok = gen_tcp:send(Sock, Cmd),
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0) of
{ok, <<"OK\n">>} ->
ok;
{ok, <<"ERROR NO-SUCH-FILE", _/binary>>} ->
{error, no_such_file};
{ok, <<"ERROR BAD-ARG", _/binary>>} ->
{error, bad_arg};
{ok, Else} ->
throw({server_protocol_error, Else})
end
catch
throw:Error ->
Error;
error:{badmatch,_}=BadMatch ->
{error, {badmatch, BadMatch}}
end.
get_latest_epoch2(Sock, ProjType) ->
ProjCmd = {get_latest_epoch, ProjType},
do_projection_common(Sock, ProjCmd).
read_latest_projection2(Sock, ProjType) ->
ProjCmd = {read_latest_projection, ProjType},
do_projection_common(Sock, ProjCmd).
read_projection2(Sock, ProjType, Epoch) ->
ProjCmd = {read_projection, ProjType, Epoch},
do_projection_common(Sock, ProjCmd).
write_projection2(Sock, ProjType, Proj) ->
ProjCmd = {write_projection, ProjType, Proj},
do_projection_common(Sock, ProjCmd).
get_all2(Sock, ProjType) ->
ProjCmd = {get_all, ProjType},
do_projection_common(Sock, ProjCmd).
list_all2(Sock, ProjType) ->
ProjCmd = {list_all, ProjType},
do_projection_common(Sock, ProjCmd).
do_projection_common(Sock, ProjCmd) ->
try
ProjCmdBin = term_to_binary(ProjCmd),
Len = iolist_size(ProjCmdBin),
true = (Len =< ?MAX_CHUNK_SIZE),
LenHex = machi_util:int_to_hexbin(Len, 32),
Cmd = [<<"PROJ ">>, LenHex, <<"\n">>],
ok = gen_tcp:send(Sock, [Cmd, ProjCmdBin]),
ok = inet:setopts(Sock, [{packet, line}]),
{ok, Line} = gen_tcp:recv(Sock, 0),
case Line of
<<"OK ", ResLenHex:8/binary, "\n">> ->
ResLen = machi_util:hexstr_to_int(ResLenHex),
ok = inet:setopts(Sock, [{packet, raw}]),
{ok, ResBin} = gen_tcp:recv(Sock, ResLen),
ok = inet:setopts(Sock, [{packet, line}]),
binary_to_term(ResBin);
Else ->
{error, Else}
end
catch
throw:Error ->
Error;
error:{badmatch,_}=BadMatch ->
{error, {badmatch, BadMatch, erlang:get_stacktrace()}}
end.

51
src/machi_flu_sup.erl Normal file
View file

@ -0,0 +1,51 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_flu_sup).
-behaviour(supervisor).
%% API
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
-define(SERVER, ?MODULE).
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
init([]) ->
RestartStrategy = one_for_one,
MaxRestarts = 1000,
MaxSecondsBetweenRestarts = 3600,
SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},
Restart = permanent,
Shutdown = 5000,
Type = worker,
{ok, FluList} = application:get_env(machi, flu_list),
FluSpecs = [{FluName, {machi_flu, start_link, [FluArgs]},
Restart, Shutdown, Type, []} ||
{FluName, _Port, _Dir}=FluArgs <- FluList],
{ok, {SupFlags, FluSpecs}}.

119
src/machi_projection.erl Normal file
View file

@ -0,0 +1,119 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_projection).
-include("machi_projection.hrl").
-export([
new/6, new/7, new/8,
update_projection_checksum/1,
update_projection_dbg2/2,
compare/2,
make_projection_summary/1
]).
new(MyName, All_list, UPI_list, Down_list, Repairing_list, Ps) ->
new(0, MyName, All_list, Down_list, UPI_list, Repairing_list, Ps).
new(EpochNum, MyName, All_list, Down_list, UPI_list, Repairing_list, Dbg) ->
new(EpochNum, MyName, All_list, Down_list, UPI_list, Repairing_list,
Dbg, []).
new(EpochNum, MyName, All_list0, Down_list, UPI_list, Repairing_list,
Dbg, Dbg2)
when is_integer(EpochNum), EpochNum >= 0,
is_atom(MyName) orelse is_binary(MyName),
is_list(All_list0), is_list(Down_list), is_list(UPI_list),
is_list(Repairing_list), is_list(Dbg), is_list(Dbg2) ->
{All_list, MemberDict} =
case lists:all(fun(P) when is_record(P, p_srvr) -> true;
(_) -> false
end, All_list0) of
true ->
All = [S#p_srvr.name || S <- All_list0],
TmpL = [{S#p_srvr.name, S} || S <- All_list0],
{All, orddict:from_list(TmpL)};
false ->
All_list1 = lists:zip(All_list0,lists:seq(0,length(All_list0)-1)),
All_list2 = [#p_srvr{name=S, address="localhost",
port=?MACHI_DEFAULT_TCP_PORT+I} ||
{S, I} <- All_list1],
TmpL = [{S#p_srvr.name, S} || S <- All_list2],
{All_list0, orddict:from_list(TmpL)}
end,
true = lists:all(fun(X) when is_atom(X) orelse is_binary(X) -> true;
(_) -> false
end, All_list),
[true = lists:sort(SomeList) == lists:usort(SomeList) ||
SomeList <- [All_list, Down_list, UPI_list, Repairing_list] ],
AllSet = ordsets:from_list(All_list),
DownSet = ordsets:from_list(Down_list),
UPISet = ordsets:from_list(UPI_list),
RepairingSet = ordsets:from_list(Repairing_list),
true = ordsets:is_element(MyName, AllSet),
true = (AllSet == ordsets:union([DownSet, UPISet, RepairingSet])),
true = ordsets:is_disjoint(DownSet, UPISet),
true = ordsets:is_disjoint(DownSet, RepairingSet),
true = ordsets:is_disjoint(UPISet, RepairingSet),
P = #projection_v1{epoch_number=EpochNum,
creation_time=now(),
author_server=MyName,
all_members=All_list,
member_dict=MemberDict,
down=Down_list,
upi=UPI_list,
repairing=Repairing_list,
dbg=Dbg
},
update_projection_dbg2(update_projection_checksum(P), Dbg2).
update_projection_checksum(P) ->
CSum = crypto:hash(sha,
term_to_binary(P#projection_v1{epoch_csum= <<>>,
dbg2=[]})),
P#projection_v1{epoch_csum=CSum}.
update_projection_dbg2(P, Dbg2) when is_list(Dbg2) ->
P#projection_v1{dbg2=Dbg2}.
-spec compare(#projection_v1{}, #projection_v1{}) ->
integer().
compare(#projection_v1{epoch_number=E1, epoch_csum=C1},
#projection_v1{epoch_number=E1, epoch_csum=C1}) ->
0;
compare(#projection_v1{epoch_number=E1},
#projection_v1{epoch_number=E2}) ->
if E1 =< E2 -> -1;
E1 > E2 -> 1
end.
make_projection_summary(#projection_v1{epoch_number=EpochNum,
all_members=_All_list,
down=Down_list,
author_server=Author,
upi=UPI_list,
repairing=Repairing_list,
dbg=Dbg, dbg2=Dbg2}) ->
[{epoch,EpochNum},{author,Author},
{upi,UPI_list},{repair,Repairing_list},{down,Down_list},
{d,Dbg}, {d2,Dbg2}].

View file

@ -0,0 +1,265 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_projection_store).
-include("machi_projection.hrl").
%% API
-export([
start_link/3,
get_latest_epoch/2, get_latest_epoch/3,
read_latest_projection/2, read_latest_projection/3,
read/3, read/4,
write/3, write/4,
get_all/2, get_all/3,
list_all/2, list_all/3
]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {
public_dir = "" :: string(),
private_dir = "" :: string(),
wedged = true :: boolean(),
wedge_notify_pid :: pid() | atom(),
max_public_epoch = -1 :: -1 | non_neg_integer(),
max_private_epoch = -1 :: -1 | non_neg_integer()
}).
start_link(RegName, DataDir, NotifyWedgeStateChanges) ->
gen_server:start_link({local, RegName},
?MODULE, [DataDir, NotifyWedgeStateChanges], []).
get_latest_epoch(PidSpec, ProjType) ->
get_latest_epoch(PidSpec, ProjType, infinity).
get_latest_epoch(PidSpec, ProjType, Timeout)
when ProjType == 'public' orelse ProjType == 'private' ->
g_call(PidSpec, {get_latest_epoch, ProjType}, Timeout).
read_latest_projection(PidSpec, ProjType) ->
read_latest_projection(PidSpec, ProjType, infinity).
read_latest_projection(PidSpec, ProjType, Timeout)
when ProjType == 'public' orelse ProjType == 'private' ->
g_call(PidSpec, {read_latest_projection, ProjType}, Timeout).
read(PidSpec, ProjType, Epoch) ->
read(PidSpec, ProjType, Epoch, infinity).
read(PidSpec, ProjType, Epoch, Timeout)
when ProjType == 'public' orelse ProjType == 'private',
is_integer(Epoch), Epoch >= 0 ->
g_call(PidSpec, {read, ProjType, Epoch}, Timeout).
write(PidSpec, ProjType, Proj) ->
write(PidSpec, ProjType, Proj, infinity).
write(PidSpec, ProjType, Proj, Timeout)
when ProjType == 'public' orelse ProjType == 'private',
is_record(Proj, projection_v1),
is_integer(Proj#projection_v1.epoch_number),
Proj#projection_v1.epoch_number >= 0 ->
g_call(PidSpec, {write, ProjType, Proj}, Timeout).
get_all(PidSpec, ProjType) ->
get_all(PidSpec, ProjType, infinity).
get_all(PidSpec, ProjType, Timeout)
when ProjType == 'public' orelse ProjType == 'private' ->
g_call(PidSpec, {get_all, ProjType}, Timeout).
list_all(PidSpec, ProjType) ->
list_all(PidSpec, ProjType, infinity).
list_all(PidSpec, ProjType, Timeout)
when ProjType == 'public' orelse ProjType == 'private' ->
g_call(PidSpec, {list_all, ProjType}, Timeout).
%%%%%%%%%%%%%%%%%%%%%%%%%%%
g_call(PidSpec, Arg, Timeout) ->
LC1 = lclock_get(),
{Res, LC2} = gen_server:call(PidSpec, {Arg, LC1}, Timeout),
lclock_update(LC2),
Res.
%%%%%%%%%%%%%%%%%%%%%%%%%%%
init([DataDir, NotifyWedgeStateChanges]) ->
lclock_init(),
PublicDir = machi_util:make_projection_filename(DataDir, "public"),
PrivateDir = machi_util:make_projection_filename(DataDir, "private"),
ok = filelib:ensure_dir(PublicDir ++ "/ignored"),
ok = filelib:ensure_dir(PrivateDir ++ "/ignored"),
MaxPublicEpoch = find_max_epoch(PublicDir),
MaxPrivateEpoch = find_max_epoch(PrivateDir),
{ok, #state{public_dir=PublicDir,
private_dir=PrivateDir,
wedged=true,
wedge_notify_pid=NotifyWedgeStateChanges,
max_public_epoch=MaxPublicEpoch,
max_private_epoch=MaxPrivateEpoch}}.
handle_call({{get_latest_epoch, ProjType}, LC1}, _From, S) ->
LC2 = lclock_update(LC1),
Epoch = if ProjType == public -> S#state.max_public_epoch;
ProjType == private -> S#state.max_private_epoch
end,
{reply, {{ok, Epoch}, LC2}, S};
handle_call({{read_latest_projection, ProjType}, LC1}, _From, S) ->
LC2 = lclock_update(LC1),
Epoch = if ProjType == public -> S#state.max_public_epoch;
ProjType == private -> S#state.max_private_epoch
end,
{Reply, NewS} = do_proj_read(ProjType, Epoch, S),
{reply, {Reply, LC2}, NewS};
handle_call({{read, ProjType, Epoch}, LC1}, _From, S) ->
LC2 = lclock_update(LC1),
{Reply, NewS} = do_proj_read(ProjType, Epoch, S),
{reply, {Reply, LC2}, NewS};
handle_call({{write, ProjType, Proj}, LC1}, _From, S) ->
LC2 = lclock_update(LC1),
{Reply, NewS} = do_proj_write(ProjType, Proj, S),
{reply, {Reply, LC2}, NewS};
handle_call({{get_all, ProjType}, LC1}, _From, S) ->
LC2 = lclock_update(LC1),
Dir = pick_path(ProjType, S),
Epochs = find_all(Dir),
All = [begin
{{ok, Proj}, _} = do_proj_read(ProjType, Epoch, S),
Proj
end || Epoch <- Epochs],
{reply, {{ok, All}, LC2}, S};
handle_call({{list_all, ProjType}, LC1}, _From, S) ->
LC2 = lclock_update(LC1),
Dir = pick_path(ProjType, S),
{reply, {{ok, find_all(Dir)}, LC2}, S};
handle_call(_Request, _From, S) ->
Reply = whaaaaaaaaaaaaa,
{reply, Reply, S}.
handle_cast(_Msg, S) ->
{noreply, S}.
handle_info(_Info, S) ->
{noreply, S}.
terminate(_Reason, _S) ->
ok.
code_change(_OldVsn, S, _Extra) ->
{ok, S}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%
do_proj_read(_ProjType, Epoch, S) when Epoch < 0 ->
{{error, not_written}, S};
do_proj_read(ProjType, Epoch, S) ->
Dir = pick_path(ProjType, S),
Path = filename:join(Dir, epoch2name(Epoch)),
case file:read_file(Path) of
{ok, Bin} ->
%% TODO and if Bin is corrupt? (even if binary_to_term() succeeds)
{{ok, binary_to_term(Bin)}, S};
{error, enoent} ->
{{error, not_written}, S};
{error, Else} ->
{{error, Else}, S}
end.
do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) ->
%% TODO: We probably ought to check the projection checksum for sanity, eh?
Dir = pick_path(ProjType, S),
Path = filename:join(Dir, epoch2name(Epoch)),
case file:read_file_info(Path) of
{ok, _FI} ->
{{error, written}, S};
{error, enoent} ->
{ok, FH} = file:open(Path, [write, raw, binary]),
ok = file:write(FH, term_to_binary(Proj)),
ok = file:sync(FH),
ok = file:close(FH),
NewS = if ProjType == public, Epoch > S#state.max_public_epoch ->
io:format(user, "TODO: tell ~p we are wedged by epoch ~p\n", [S#state.wedge_notify_pid, Epoch]),
S#state{max_public_epoch=Epoch, wedged=true};
ProjType == private, Epoch > S#state.max_private_epoch ->
io:format(user, "TODO: tell ~p we are unwedged by epoch ~p\n", [S#state.wedge_notify_pid, Epoch]),
S#state{max_private_epoch=Epoch, wedged=false};
true ->
S
end,
{ok, NewS};
{error, Else} ->
{{error, Else}, S}
end.
pick_path(public, S) ->
S#state.public_dir;
pick_path(private, S) ->
S#state.private_dir.
epoch2name(Epoch) ->
machi_util:int_to_hexstr(Epoch, 32).
name2epoch(Name) ->
machi_util:hexstr_to_int(Name).
find_all(Dir) ->
Fs = filelib:wildcard("*", Dir),
lists:sort([name2epoch(F) || F <- Fs]).
find_max_epoch(Dir) ->
Fs = lists:sort(filelib:wildcard("*", Dir)),
if Fs == [] ->
-1;
true ->
name2epoch(lists:last(Fs))
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%
-ifdef(TEST).
lclock_init() ->
lamport_clock:init().
lclock_get() ->
lamport_clock:get().
lclock_update(LC) ->
lamport_clock:update(LC).
-else. % TEST
lclock_init() ->
ok.
lclock_get() ->
ok.
lclock_update(_LC) ->
ok.
-endif. % TEST

191
src/machi_sequencer.erl Normal file
View file

@ -0,0 +1,191 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_sequencer).
-compile(export_all).
-include_lib("kernel/include/file.hrl").
-define(CONFIG_DIR, "./config").
-define(DATA_DIR, "./data").
seq(Server, Prefix, Size) when is_binary(Prefix), is_integer(Size), Size > -1 ->
Server ! {seq, self(), Prefix, Size},
receive
{assignment, File, Offset} ->
{File, Offset}
after 1*1000 ->
bummer
end.
seq_direct(Prefix, Size) when is_binary(Prefix), is_integer(Size), Size > -1 ->
RegName = make_regname(Prefix),
seq(RegName, Prefix, Size).
start_server() ->
start_server(?MODULE).
start_server(Name) ->
spawn_link(fun() -> run_server(Name) end).
run_server(Name) ->
register(Name, self()),
ets:new(?MODULE, [named_table, public, {write_concurrency, true}]),
server_loop().
server_loop() ->
receive
{seq, From, Prefix, Size} ->
spawn(fun() -> server_dispatch(From, Prefix, Size) end),
server_loop()
end.
server_dispatch(From, Prefix, Size) ->
RegName = make_regname(Prefix),
case whereis(RegName) of
undefined ->
start_prefix_server(Prefix),
timer:sleep(1),
server_dispatch(From, Prefix, Size);
Pid ->
Pid ! {seq, From, Prefix, Size}
end,
exit(normal).
start_prefix_server(Prefix) ->
spawn(fun() -> run_prefix_server(Prefix) end).
run_prefix_server(Prefix) ->
true = register(make_regname(Prefix), self()),
ok = filelib:ensure_dir(?CONFIG_DIR ++ "/unused"),
ok = filelib:ensure_dir(?DATA_DIR ++ "/unused"),
FileNum = read_max_filenum(Prefix) + 1,
ok = increment_max_filenum(Prefix),
prefix_server_loop(Prefix, FileNum).
prefix_server_loop(Prefix, FileNum) ->
File = make_data_filename(Prefix, FileNum),
prefix_server_loop(Prefix, File, FileNum, 0).
prefix_server_loop(Prefix, File, FileNum, Offset) ->
receive
{seq, From, Prefix, Size} ->
From ! {assignment, File, Offset},
prefix_server_loop(Prefix, File, FileNum, Offset + Size)
after 30*1000 ->
io:format("timeout: ~p server stopping\n", [Prefix]),
exit(normal)
end.
make_regname(Prefix) ->
erlang:binary_to_atom(Prefix, latin1).
make_config_filename(Prefix) ->
lists:flatten(io_lib:format("~s/~s", [?CONFIG_DIR, Prefix])).
make_data_filename(Prefix, FileNum) ->
erlang:iolist_to_binary(io_lib:format("~s/~s.~w",
[?DATA_DIR, Prefix, FileNum])).
read_max_filenum(Prefix) ->
case file:read_file_info(make_config_filename(Prefix)) of
{error, enoent} ->
0;
{ok, FI} ->
FI#file_info.size
end.
increment_max_filenum(Prefix) ->
{ok, FH} = file:open(make_config_filename(Prefix), [append]),
ok = file:write(FH, "x"),
%% ok = file:sync(FH),
ok = file:close(FH).
%%%%%%%%%%%%%%%%%
%% basho_bench callbacks
-define(SEQ, ?MODULE).
new(1) ->
start_server(),
timer:sleep(100),
{ok, unused};
new(_Id) ->
{ok, unused}.
run(null, _KeyGen, _ValgueGen, State) ->
{ok, State};
run(keygen_then_null, KeyGen, _ValgueGen, State) ->
_Prefix = KeyGen(),
{ok, State};
run(seq, KeyGen, _ValgueGen, State) ->
Prefix = KeyGen(),
{_, _} = ?SEQ:seq(?SEQ, Prefix, 1),
{ok, State};
run(seq_direct, KeyGen, _ValgueGen, State) ->
Prefix = KeyGen(),
Name = ?SEQ:make_regname(Prefix),
case get(Name) of
undefined ->
case whereis(Name) of
undefined ->
{_, _} = ?SEQ:seq(?SEQ, Prefix, 1);
Pid ->
put(Name, Pid),
{_, _} = ?SEQ:seq(Pid, Prefix, 1)
end;
Pid ->
{_, _} = ?SEQ:seq(Pid, Prefix, 1)
end,
{ok, State};
run(seq_ets, KeyGen, _ValgueGen, State) ->
Tab = ?MODULE,
Prefix = KeyGen(),
Res = try
BigNum = ets:update_counter(Tab, Prefix, 1),
BigBin = <<BigNum:80/big>>,
<<FileNum:32/big, Offset:48/big>> = BigBin,
%% if Offset rem 1000 == 0 ->
%% io:format("~p,~p ", [FileNum, Offset]);
%% true ->
%% ok
%% end,
{fakefake, FileNum, Offset}
catch error:badarg ->
FileNum2 = 1, Offset2 = 0,
FileBin = <<FileNum2:32/big>>,
OffsetBin = <<Offset2:48/big>>,
Glop = <<FileBin/binary, OffsetBin/binary>>,
<<Base:80/big>> = Glop,
%% if Prefix == <<"42">> -> io:format("base:~w\n", [Base]); true -> ok end,
%% Base = 0,
case ets:insert_new(Tab, {Prefix, Base}) of
true ->
{<<"fakefakefake">>, Base};
false ->
Result2 = ets:update_counter(Tab, Prefix, 1),
{<<"fakefakefake">>, Result2}
end
end,
Res = Res,
{ok, State}.

55
src/machi_sup.erl Normal file
View file

@ -0,0 +1,55 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_sup).
-behaviour(supervisor).
%% API
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
-define(SERVER, ?MODULE).
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
init([]) ->
RestartStrategy = one_for_one,
MaxRestarts = 1000,
MaxSecondsBetweenRestarts = 3600,
SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},
Restart = permanent,
Shutdown = 5000,
Type = supervisor,
ServerSup =
{machi_flu_sup, {machi_flu_sup, start_link, []},
Restart, Shutdown, Type, []},
{ok, {SupFlags, [ServerSup]}}.
%% AChild = {'AName', {'AModule', start_link, []},
%% Restart, Shutdown, Type, ['AModule']},
%% {ok, {SupFlags, [AChild]}}.

180
src/machi_util.erl Normal file
View file

@ -0,0 +1,180 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_util).
-export([
checksum/1,
hexstr_to_bin/1, bin_to_hexstr/1,
hexstr_to_int/1, int_to_hexstr/2, int_to_hexbin/2,
make_binary/1, make_string/1,
make_regname/1,
make_checksum_filename/2, make_data_filename/2,
make_projection_filename/2,
read_max_filenum/2, increment_max_filenum/2,
info_msg/2, verb/1, verb/2,
%% TCP protocol helpers
connect/2
]).
-compile(export_all).
-include("machi.hrl").
-include("machi_projection.hrl").
-include_lib("kernel/include/file.hrl").
append(Server, Prefix, Chunk) when is_binary(Prefix), is_binary(Chunk) ->
CSum = checksum(Chunk),
Server ! {seq_append, self(), Prefix, Chunk, CSum},
receive
{assignment, Offset, File} ->
{Offset, File}
after 10*1000 ->
bummer
end.
make_regname(Prefix) when is_binary(Prefix) ->
erlang:binary_to_atom(Prefix, latin1);
make_regname(Prefix) when is_list(Prefix) ->
erlang:list_to_atom(Prefix).
make_config_filename(DataDir, Prefix) ->
lists:flatten(io_lib:format("~s/config/~s", [DataDir, Prefix])).
make_checksum_filename(DataDir, Prefix, SequencerName, FileNum) ->
lists:flatten(io_lib:format("~s/config/~s.~s.~w.csum",
[DataDir, Prefix, SequencerName, FileNum])).
make_checksum_filename(DataDir, "") ->
lists:flatten(io_lib:format("~s/config", [DataDir]));
make_checksum_filename(DataDir, FileName) ->
lists:flatten(io_lib:format("~s/config/~s.csum", [DataDir, FileName])).
make_data_filename(DataDir, "") ->
FullPath = lists:flatten(io_lib:format("~s/data", [DataDir])),
{"", FullPath};
make_data_filename(DataDir, File) ->
FullPath = lists:flatten(io_lib:format("~s/data/~s", [DataDir, File])),
{File, FullPath}.
make_data_filename(DataDir, Prefix, SequencerName, FileNum) ->
File = erlang:iolist_to_binary(io_lib:format("~s.~s.~w",
[Prefix, SequencerName, FileNum])),
FullPath = lists:flatten(io_lib:format("~s/data/~s", [DataDir, File])),
{File, FullPath}.
make_projection_filename(DataDir, "") ->
lists:flatten(io_lib:format("~s/projection", [DataDir]));
make_projection_filename(DataDir, File) ->
lists:flatten(io_lib:format("~s/projection/~s", [DataDir, File])).
read_max_filenum(DataDir, Prefix) ->
case file:read_file_info(make_config_filename(DataDir, Prefix)) of
{error, enoent} ->
0;
{ok, FI} ->
FI#file_info.size
end.
increment_max_filenum(DataDir, Prefix) ->
try
{ok, FH} = file:open(make_config_filename(DataDir, Prefix), [append]),
ok = file:write(FH, "x"),
%% ok = file:sync(FH),
ok = file:close(FH)
catch
error:{badmatch,_}=Error ->
{error, Error, erlang:get_stacktrace()}
end.
hexstr_to_bin(S) when is_list(S) ->
hexstr_to_bin(S, []);
hexstr_to_bin(B) when is_binary(B) ->
hexstr_to_bin(binary_to_list(B), []).
hexstr_to_bin([], Acc) ->
list_to_binary(lists:reverse(Acc));
hexstr_to_bin([X,Y|T], Acc) ->
{ok, [V], []} = io_lib:fread("~16u", [X,Y]),
hexstr_to_bin(T, [V | Acc]).
bin_to_hexstr(<<>>) ->
[];
bin_to_hexstr(<<X:4, Y:4, Rest/binary>>) ->
[hex_digit(X), hex_digit(Y)|bin_to_hexstr(Rest)].
hex_digit(X) when X < 10 ->
X + $0;
hex_digit(X) ->
X - 10 + $a.
make_binary(X) when is_binary(X) ->
X;
make_binary(X) when is_list(X) ->
iolist_to_binary(X).
make_string(X) when is_list(X) ->
lists:flatten(X);
make_string(X) when is_binary(X) ->
binary_to_list(X).
hexstr_to_int(X) ->
B = hexstr_to_bin(X),
B_size = byte_size(B) * 8,
<<I:B_size/big>> = B,
I.
int_to_hexstr(I, I_size) ->
bin_to_hexstr(<<I:I_size/big>>).
int_to_hexbin(I, I_size) ->
list_to_binary(int_to_hexstr(I, I_size)).
checksum(Bin) when is_binary(Bin) ->
crypto:hash(md5, Bin).
verb(Fmt) ->
verb(Fmt, []).
verb(Fmt, Args) ->
case application:get_env(kernel, verbose) of
{ok, true} -> io:format(Fmt, Args);
_ -> ok
end.
info_msg(Fmt, Args) ->
case application:get_env(kernel, verbose) of {ok, false} -> ok;
_ -> error_logger:info_msg(Fmt, Args)
end.
%%%%%%%%%%%%%%%%%
-spec connect(inet:ip_address() | inet:hostname(), inet:port_number()) ->
port().
connect(Host, Port) ->
escript_connect(Host, Port).
escript_connect(Host, PortStr) when is_list(PortStr) ->
Port = list_to_integer(PortStr),
escript_connect(Host, Port);
escript_connect(Host, Port) when is_integer(Port) ->
{ok, Sock} = gen_tcp:connect(Host, Port, [{active,false}, {mode,binary},
{packet, raw}]),
Sock.

View file

@ -0,0 +1,72 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_admin_util_test).
-compile(export_all).
-ifdef(TEST).
-include("machi.hrl").
-include("machi_projection.hrl").
-define(FLU, machi_flu1).
-define(FLU_C, machi_flu1_client).
verify_file_checksums_test() ->
Host = "localhost",
TcpPort = 32958,
DataDir = "./data",
FLU1 = machi_flu1_test:setup_test_flu(verify1_flu, TcpPort, DataDir),
Sock1 = machi_util:connect(Host, TcpPort),
try
Prefix = <<"verify_prefix">>,
[{ok, _} = ?FLU_C:append_chunk(Sock1, ?DUMMY_PV1_EPOCH,
Prefix, <<X:(X*8)/big>>) ||
X <- lists:seq(1,10)],
{ok, [{_FileSize,File}]} = ?FLU_C:list_files(Sock1, ?DUMMY_PV1_EPOCH),
{ok, []} = machi_admin_util:verify_file_checksums_remote(
Host, TcpPort, ?DUMMY_PV1_EPOCH, File),
{_, Path} = machi_util:make_data_filename(DataDir,binary_to_list(File)),
{ok, FH} = file:open(Path, [read,write]),
{ok, _} = file:position(FH, ?MINIMUM_OFFSET),
ok = file:write(FH, "y"),
ok = file:write(FH, "yo"),
ok = file:write(FH, "yo!"),
ok = file:close(FH),
%% Check the local flavor of the API
{ok, Res1} = machi_admin_util:verify_file_checksums_local(
Host, TcpPort, ?DUMMY_PV1_EPOCH, Path),
3 = length(Res1),
%% Check the remote flavor of the API
{ok, Res2} = machi_admin_util:verify_file_checksums_remote(
Host, TcpPort, ?DUMMY_PV1_EPOCH, File),
3 = length(Res2),
ok
after
catch ?FLU_C:quit(Sock1),
ok = ?FLU:stop(FLU1)
end.
-endif. % TEST

156
test/machi_flu1_test.erl Normal file
View file

@ -0,0 +1,156 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_flu1_test).
-compile(export_all).
-ifdef(TEST).
-include("machi.hrl").
-include("machi_projection.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(FLU, machi_flu1).
-define(FLU_C, machi_flu1_client).
setup_test_flu(RegName, TcpPort, DataDir) ->
setup_test_flu(RegName, TcpPort, DataDir, []).
setup_test_flu(RegName, TcpPort, DataDir, DbgProps) ->
clean_up_data_dir(DataDir),
{ok, FLU1} = ?FLU:start_link([{RegName, TcpPort, DataDir},
{dbg, DbgProps}]),
%% TODO the process structuring/racy-ness of the various processes
%% of the FLU needs to be deterministic to remove this sleep race
%% "prevention".
timer:sleep(10),
FLU1.
flu_smoke_test() ->
Host = "localhost",
TcpPort = 32957,
DataDir = "./data",
Prefix = <<"prefix!">>,
BadPrefix = BadFile = "no/good",
FLU1 = setup_test_flu(smoke_flu, TcpPort, DataDir),
try
{error, no_such_file} = ?FLU_C:checksum_list(Host, TcpPort,
?DUMMY_PV1_EPOCH,
"does-not-exist"),
{error, bad_arg} = ?FLU_C:checksum_list(Host, TcpPort,
?DUMMY_PV1_EPOCH, BadFile),
{ok, []} = ?FLU_C:list_files(Host, TcpPort, ?DUMMY_PV1_EPOCH),
Chunk1 = <<"yo!">>,
{ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH,
Prefix, Chunk1),
{ok, Chunk1} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
File1, Off1, Len1),
{ok, [{_,_,_}]} = ?FLU_C:checksum_list(Host, TcpPort,
?DUMMY_PV1_EPOCH, File1),
{error, bad_arg} = ?FLU_C:append_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH,
BadPrefix, Chunk1),
{ok, [{_,File1}]} = ?FLU_C:list_files(Host, TcpPort, ?DUMMY_PV1_EPOCH),
Len1 = size(Chunk1),
{error, no_such_file} = ?FLU_C:read_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH,
File1, Off1*983, Len1),
{error, partial_read} = ?FLU_C:read_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH,
File1, Off1, Len1*984),
Chunk2 = <<"yo yo">>,
Len2 = byte_size(Chunk2),
Off2 = ?MINIMUM_OFFSET + 77,
File2 = "smoke-prefix",
ok = ?FLU_C:write_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
File2, Off2, Chunk2),
{error, bad_arg} = ?FLU_C:write_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
BadFile, Off2, Chunk2),
{ok, Chunk2} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
File2, Off2, Len2),
{error, no_such_file} = ?FLU_C:read_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH,
"no!!", Off2, Len2),
{error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH,
BadFile, Off2, Len2),
%% We know that File1 still exists. Pretend that we've done a
%% migration and exercise the delete_migration() API.
ok = ?FLU_C:delete_migration(Host, TcpPort, ?DUMMY_PV1_EPOCH, File1),
{error, no_such_file} = ?FLU_C:delete_migration(Host, TcpPort,
?DUMMY_PV1_EPOCH, File1),
{error, bad_arg} = ?FLU_C:delete_migration(Host, TcpPort,
?DUMMY_PV1_EPOCH, BadFile),
%% We know that File2 still exists. Pretend that we've done a
%% migration and exercise the trunc_hack() API.
ok = ?FLU_C:trunc_hack(Host, TcpPort, ?DUMMY_PV1_EPOCH, File2),
ok = ?FLU_C:trunc_hack(Host, TcpPort, ?DUMMY_PV1_EPOCH, File2),
{error, bad_arg} = ?FLU_C:trunc_hack(Host, TcpPort,
?DUMMY_PV1_EPOCH, BadFile),
ok = ?FLU_C:quit(machi_util:connect(Host, TcpPort))
after
ok = ?FLU:stop(FLU1)
end.
flu_projection_smoke_test() ->
Host = "localhost",
TcpPort = 32959,
DataDir = "./data",
FLU1 = setup_test_flu(projection_test_flu, TcpPort, DataDir),
try
[begin
{ok, -1} = ?FLU_C:get_latest_epoch(Host, TcpPort, T),
{error, not_written} =
?FLU_C:read_latest_projection(Host, TcpPort, T),
{ok, []} = ?FLU_C:list_all(Host, TcpPort, T),
{ok, []} = ?FLU_C:get_all(Host, TcpPort, T),
P1 = machi_projection:new(1, a, [a], [], [a], [], []),
ok = ?FLU_C:write_projection(Host, TcpPort, T, P1),
{error, written} = ?FLU_C:write_projection(Host, TcpPort, T, P1),
{ok, P1} = ?FLU_C:read_projection(Host, TcpPort, T, 1),
{ok, P1} = ?FLU_C:read_latest_projection(Host, TcpPort, T),
{ok, [1]} = ?FLU_C:list_all(Host, TcpPort, T),
{ok, [P1]} = ?FLU_C:get_all(Host, TcpPort, T),
{error, not_written} = ?FLU_C:read_projection(Host, TcpPort, T, 2)
end || T <- [public, private] ]
after
ok = ?FLU:stop(FLU1)
end.
clean_up_data_dir(DataDir) ->
[begin
Fs = filelib:wildcard(DataDir ++ Glob),
[file:delete(F) || F <- Fs],
[file:del_dir(F) || F <- Fs]
end || Glob <- ["*/*/*/*", "*/*/*", "*/*", "*"] ],
_ = file:del_dir(DataDir),
ok.
-endif. % TEST

View file

@ -0,0 +1,77 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_projection_test).
-ifdef(TEST).
-compile(export_all).
-include("machi_projection.hrl").
new_test() ->
%% Bleh, hey QuickCheck ... except that any model probably equals
%% code under test, bleh.
true = try_it(a, [a,b,c], [a,b], [], [c], []),
true = try_it(<<"a">>, [<<"a">>,b,c], [<<"a">>,b], [], [c], []),
Servers = [#p_srvr{name=a}, #p_srvr{name=b}, #p_srvr{name=c}],
Servers_bad1 = [#p_srvr{name= <<"a">>}, #p_srvr{name=b}, #p_srvr{name=c}],
Servers_bad2 = [#p_srvr{name=z}, #p_srvr{name=b}, #p_srvr{name=c}],
true = try_it(a, Servers, [a,b], [], [c], []),
false = try_it(a, not_list, [a,b], [], [c], []),
false = try_it(a, [a,b,c], not_list, [], [c], []),
false = try_it(a, [a,b,c], [a,b], not_list, [c], []),
false = try_it(a, [a,b,c], [a,b], [], not_list, []),
false = try_it(a, [a,b,c], [a,b], [], [c], not_list),
false = try_it(<<"x">>, [a,b,c], [a,b], [], [c], []),
false = try_it(a, [a,b,c], [a,b,c], [], [c], []),
false = try_it(a, [a,b,c], [a,b], [c], [c], []),
false = try_it(a, [a,b,c], [a,b], [], [c,c], []),
false = try_it(a, Servers_bad1, [a,b], [], [c], []),
false = try_it(a, Servers_bad2, [a,b], [], [c], []),
ok.
compare_test() ->
P0 = machi_projection:new(0, a, [a,b,c], [a,b], [], [c], []),
P1a = machi_projection:new(1, a, [a,b,c], [a,b], [], [c], []),
P1b = machi_projection:new(1, b, [a,b,c], [a,b], [], [c], []),
P2 = machi_projection:new(2, a, [a,b,c], [a,b], [], [c], []),
0 = machi_projection:compare(P0, P0),
-1 = machi_projection:compare(P0, P1a),
-1 = machi_projection:compare(P1a, P1b),
-1 = machi_projection:compare(P1b, P1a),
1 = machi_projection:compare(P2, P1a),
1 = machi_projection:compare(P2, P1b),
1 = machi_projection:compare(P2, P0),
ok.
try_it(MyName, All_list, UPI_list, Down_list, Repairing_list, Ps) ->
try
P = machi_projection:new(MyName, All_list, UPI_list, Down_list,
Repairing_list, Ps),
is_record(P, projection_v1)
catch _:_ ->
false
end.
-endif. % TEST

View file

@ -0,0 +1,154 @@
%% -------------------------------------------------------------------
%%
%% Machi: a small village of replicated files
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%%% File : handle_errors.erl
%%% Author : Ulf Norell
%%% Description :
%%% Created : 26 Mar 2012 by Ulf Norell
-module(event_logger).
-compile(export_all).
-behaviour(gen_server).
%% API
-export([start_link/0, event/1, event/2, get_events/0, start_logging/0]).
-export([timestamp/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, { start_time, events = [] }).
-record(event, { timestamp, data }).
%%====================================================================
%% API
%%====================================================================
%%--------------------------------------------------------------------
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
%% Description: Starts the server
%%--------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
start_logging() ->
gen_server:call(?MODULE, {start, timestamp()}).
event(EventData) ->
event(EventData, timestamp()).
event(EventData, Timestamp) ->
gen_server:call(?MODULE,
#event{ timestamp = Timestamp, data = EventData }).
async_event(EventData) ->
gen_server:cast(?MODULE,
#event{ timestamp = timestamp(), data = EventData }).
get_events() ->
gen_server:call(?MODULE, get_events).
%%====================================================================
%% gen_server callbacks
%%====================================================================
%%--------------------------------------------------------------------
%% Function: init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% Description: Initiates the server
%%--------------------------------------------------------------------
init([]) ->
{ok, #state{}}.
%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) ->
%% {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
%% Description: Handling call messages
%%--------------------------------------------------------------------
handle_call(Event = #event{}, _From, State) ->
{reply, ok, add_event(Event, State)};
handle_call({start, Now}, _From, S) ->
{reply, ok, S#state{ events = [], start_time = Now }};
handle_call(get_events, _From, S) ->
{reply, lists:reverse([ {E#event.timestamp, E#event.data} || E <- S#state.events]),
S#state{ events = [] }};
handle_call(Request, _From, State) ->
{reply, {error, {bad_call, Request}}, State}.
%%--------------------------------------------------------------------
%% Function: handle_cast(Msg, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling cast messages
%%--------------------------------------------------------------------
handle_cast(Event = #event{}, State) ->
{noreply, add_event(Event, State)};
handle_cast(_Msg, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling all non call/cast messages
%%--------------------------------------------------------------------
handle_info(_Info, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: terminate(Reason, State) -> void()
%% Description: This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any necessary
%% cleaning up. When it returns, the gen_server terminates with Reason.
%% The return value is ignored.
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
%% Description: Convert process state when code is changed
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
add_event(#event{timestamp = Now, data = Data}, State) ->
Event = #event{ timestamp = Now, data = Data },
State#state{ events = [Event|State#state.events] }.
timestamp() ->
lamport_clock:get().

View file

@ -0,0 +1,174 @@
%% -------------------------------------------------------------------
%%
%% Machi: a small village of replicated files
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%%%-------------------------------------------------------------------
%%% @author Hans Svensson <>
%%% @copyright (C) 2012, Hans Svensson
%%% @doc
%%%
%%% @end
%%% Created : 19 Mar 2012 by Hans Svensson <>
%%%-------------------------------------------------------------------
-module(handle_errors).
-behaviour(gen_event).
%% API
-export([start_link/0, add_handler/0]).
%% gen_event callbacks
-export([init/1, handle_event/2, handle_call/2,
handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, { errors = [] }).
%%%===================================================================
%%% gen_event callbacks
%%%===================================================================
%%--------------------------------------------------------------------
%% @doc
%% Creates an event manager
%%
%% @spec start_link() -> {ok, Pid} | {error, Error}
%% @end
%%--------------------------------------------------------------------
start_link() ->
gen_event:start_link({local, ?SERVER}).
%%--------------------------------------------------------------------
%% @doc
%% Adds an event handler
%%
%% @spec add_handler() -> ok | {'EXIT', Reason} | term()
%% @end
%%--------------------------------------------------------------------
add_handler() ->
gen_event:add_handler(?SERVER, ?MODULE, []).
%%%===================================================================
%%% gen_event callbacks
%%%===================================================================
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever a new event handler is added to an event manager,
%% this function is called to initialize the event handler.
%%
%% @spec init(Args) -> {ok, State}
%% @end
%%--------------------------------------------------------------------
init([]) ->
{ok, #state{}}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever an event manager receives an event sent using
%% gen_event:notify/2 or gen_event:sync_notify/2, this function is
%% called for each installed event handler to handle the event.
%%
%% @spec handle_event(Event, State) ->
%% {ok, State} |
%% {swap_handler, Args1, State1, Mod2, Args2} |
%% remove_handler
%% @end
%%--------------------------------------------------------------------
handle_event({error, _, {_, "Hintfile '~s' has bad CRC" ++ _, _}}, State) ->
{ok, State};
handle_event({error, _, {_, "** Generic server" ++ _, _}}, State) ->
{ok, State};
handle_event({error, _, {_, "Failed to merge ~p: ~p\n", [_, not_ready]}}, State) ->
{ok, State};
handle_event({error, _, {_, "Failed to merge ~p: ~p\n", [_, {merge_locked, _, _}]}}, State) ->
{ok, State};
handle_event({error, _, {_, "Failed to read lock data from ~s: ~p\n", [_, {invalid_data, <<>>}]}}, State) ->
{ok, State};
handle_event({error, _, Event}, State) ->
{ok, State#state{ errors = [Event|State#state.errors] }};
handle_event(_Event, State) ->
{ok, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever an event manager receives a request sent using
%% gen_event:call/3,4, this function is called for the specified
%% event handler to handle the request.
%%
%% @spec handle_call(Request, State) ->
%% {ok, Reply, State} |
%% {swap_handler, Reply, Args1, State1, Mod2, Args2} |
%% {remove_handler, Reply}
%% @end
%%--------------------------------------------------------------------
handle_call(get_errors, S) ->
{ok, S#state.errors, S#state{ errors = [] }};
handle_call(_Request, State) ->
Reply = ok,
{ok, Reply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called for each installed event handler when
%% an event manager receives any other message than an event or a
%% synchronous request (or a system message).
%%
%% @spec handle_info(Info, State) ->
%% {ok, State} |
%% {swap_handler, Args1, State1, Mod2, Args2} |
%% remove_handler
%% @end
%%--------------------------------------------------------------------
handle_info(_Info, State) ->
{ok, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever an event handler is deleted from an event manager, this
%% function is called. It should be the opposite of Module:init/1 and
%% do any necessary cleaning up.
%%
%% @spec terminate(Reason, State) -> void()
%% @end
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Convert process state when code is changed
%%
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
%% @end
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================

View file

@ -0,0 +1,73 @@
%% -------------------------------------------------------------------
%%
%% Machi: a small village of replicated files
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(lamport_clock).
-export([init/0, reset/0, get/0, update/1, incr/0]).
-define(KEY, ?MODULE).
-ifdef(TEST).
init() ->
case get(?KEY) of
undefined ->
reset();
N when is_integer(N) ->
ok
end.
reset() ->
FakeTOD = 0,
put(?KEY, FakeTOD + 1).
get() ->
init(),
get(?KEY).
update(Remote) ->
New = erlang:max(get(?KEY), Remote) + 1,
put(?KEY, New),
New.
incr() ->
New = get(?KEY) + 1,
put(?KEY, New),
New.
-else. % TEST
init() ->
ok.
reset() ->
ok.
get() ->
ok.
update(_) ->
ok.
incr() ->
ok.
-endif. % TEST