diff --git a/src/machi_flu_filename_mgr.erl b/src/machi_flu_filename_mgr.erl index 9ceea0a..65a0a93 100644 --- a/src/machi_flu_filename_mgr.erl +++ b/src/machi_flu_filename_mgr.erl @@ -97,11 +97,10 @@ start_link(FluName, DataDir) when is_atom(FluName) andalso is_list(DataDir) -> % tuple in the form of `{file, F}' or an `{error, Reason}' find_or_make_filename_from_prefix(FluName, EpochId, {prefix, Prefix}, - {coc, CoC_Namespace, CoC_Locator}) + {coc, _CoC_Ns, _CoC_Loc}=CoC_NL) when is_atom(FluName) -> - io:format(user, "TODO: CoC_Namespace, CoC_Locator ~p ~p\n", [CoC_Namespace, CoC_Locator]), N = make_filename_mgr_name(FluName), - gen_server:call(N, {find_filename, EpochId, Prefix}, ?TIMEOUT); + gen_server:call(N, {find_filename, EpochId, CoC_NL, Prefix}, ?TIMEOUT); find_or_make_filename_from_prefix(_FluName, _EpochId, Other, Other2) -> lager:error("~p is not a valid prefix/CoC ~p", [Other, Other2]), error(badarg). @@ -143,19 +142,19 @@ handle_cast(Req, State) -> %% the FLU has already validated that the caller's epoch id and the FLU's epoch id %% are the same. So we *assume* that remains the case here - that is to say, we %% are not wedged. -handle_call({find_filename, EpochId, Prefix}, _From, S = #state{ datadir = DataDir, - epoch = EpochId, - tid = Tid}) -> +handle_call({find_filename, EpochId, CoC_NL, Prefix}, _From, S = #state{ datadir = DataDir, + epoch = EpochId, + tid = Tid }) -> %% Our state and the caller's epoch ids are the same. Business as usual. - File = handle_find_file(Tid, Prefix, DataDir), + File = handle_find_file(Tid, CoC_NL, Prefix, DataDir), {reply, {file, File}, S}; -handle_call({find_filename, EpochId, Prefix}, _From, S = #state{ datadir = DataDir, tid = Tid }) -> +handle_call({find_filename, EpochId, CoC_NL, Prefix}, _From, S = #state{ datadir = DataDir, tid = Tid }) -> %% If the epoch id in our state and the caller's epoch id were the same, it would've %% matched the above clause. Since we're here, we know that they are different. %% If epoch ids between our state and the caller's are different, we must increment the %% sequence number, generate a filename and then cache it. - File = increment_and_cache_filename(Tid, DataDir, Prefix), + File = increment_and_cache_filename(Tid, DataDir, CoC_NL, Prefix), {reply, {file, File}, S#state{epoch = EpochId}}; handle_call({increment_sequence, Prefix}, _From, S = #state{ datadir = DataDir }) -> @@ -203,11 +202,11 @@ list_files(DataDir, Prefix) -> make_filename_mgr_name(FluName) when is_atom(FluName) -> list_to_atom(atom_to_list(FluName) ++ "_filename_mgr"). -handle_find_file(Tid, Prefix, DataDir) -> - N = machi_util:read_max_filenum(DataDir, Prefix), +handle_find_file(Tid, {coc,CoC_Namespace,CoC_Locator}, Prefix, DataDir) -> + N = machi_util:read_max_filenum(DataDir, CoC_Namespace, CoC_Locator, Prefix), {File, Cleanup} = case find_file(DataDir, Prefix, N) of [] -> - {find_or_make_filename(Tid, DataDir, Prefix, N), false}; + {find_or_make_filename(Tid, DataDir, CoC_Namespace, CoC_Locator, Prefix, N), false}; [H] -> {H, true}; [Fn | _ ] = L -> lager:debug( @@ -218,20 +217,20 @@ handle_find_file(Tid, Prefix, DataDir) -> maybe_cleanup(Tid, {Prefix, N}, Cleanup), filename:basename(File). -find_or_make_filename(Tid, DataDir, Prefix, N) -> - case ets:lookup(Tid, {Prefix, N}) of +find_or_make_filename(Tid, DataDir, CoC_Namespace, CoC_Locator, Prefix, N) -> + case ets:lookup(Tid, {CoC_Namespace, CoC_Locator, Prefix, N}) of [] -> - F = generate_filename(DataDir, Prefix, N), - true = ets:insert_new(Tid, {{Prefix, N}, F}), + F = generate_filename(DataDir, CoC_Namespace, CoC_Locator, Prefix, N), + true = ets:insert_new(Tid, {{CoC_Namespace, CoC_Locator, Prefix, N}, F}), F; [{_Key, File}] -> File end. -generate_filename(DataDir, Prefix, N) -> +generate_filename(DataDir, CoC_Namespace, CoC_Locator, Prefix, N) -> {F, _} = machi_util:make_data_filename( DataDir, - Prefix, + CoC_Namespace, CoC_Locator, Prefix, generate_uuid_v4_str(), N), binary_to_list(F). @@ -241,11 +240,11 @@ maybe_cleanup(_Tid, _Key, false) -> maybe_cleanup(Tid, Key, true) -> true = ets:delete(Tid, Key). -increment_and_cache_filename(Tid, DataDir, Prefix) -> - ok = machi_util:increment_max_filenum(DataDir, Prefix), - N = machi_util:read_max_filenum(DataDir, Prefix), - F = generate_filename(DataDir, Prefix, N), - true = ets:insert_new(Tid, {{Prefix, N}, F}), +increment_and_cache_filename(Tid, DataDir, {coc,CoC_Namespace,CoC_Locator}, Prefix) -> + ok = machi_util:increment_max_filenum(DataDir, CoC_Namespace, CoC_Locator, Prefix), + N = machi_util:read_max_filenum(DataDir, CoC_Namespace, CoC_Locator, Prefix), + F = generate_filename(DataDir, CoC_Namespace, CoC_Locator, Prefix, N), + true = ets:insert_new(Tid, {{CoC_Namespace, CoC_Locator, Prefix, N}, F}), filename:basename(F). diff --git a/src/machi_util.erl b/src/machi_util.erl index 8aa1972..83b4e6e 100644 --- a/src/machi_util.erl +++ b/src/machi_util.erl @@ -30,13 +30,13 @@ hexstr_to_int/1, int_to_hexstr/2, int_to_hexbin/2, make_binary/1, make_string/1, make_regname/1, - make_config_filename/2, + make_config_filename/4, make_checksum_filename/4, make_checksum_filename/2, - make_data_filename/4, make_data_filename/2, + make_data_filename/6, make_data_filename/2, make_projection_filename/2, is_valid_filename/1, parse_filename/1, - read_max_filenum/2, increment_max_filenum/2, + read_max_filenum/4, increment_max_filenum/4, info_msg/2, verb/1, verb/2, mbytes/1, pretty_time/0, pretty_time/2, @@ -68,10 +68,12 @@ make_regname(Prefix) when is_list(Prefix) -> %% @doc Calculate a config file path, by common convention. --spec make_config_filename(string(), string()) -> +-spec make_config_filename(string(), riak_dt:coc_namespace(), riak_dt:coc_locator(), string()) -> string(). -make_config_filename(DataDir, Prefix) -> - lists:flatten(io_lib:format("~s/config/~s", [DataDir, Prefix])). +make_config_filename(DataDir, CoC_Namespace, CoC_Locator, Prefix) -> + Locator_str = int_to_hexstr(CoC_Locator, 32), + lists:flatten(io_lib:format("~s/config/~s^~s^~s", + [DataDir, Prefix, CoC_Namespace, Locator_str])). %% @doc Calculate a checksum file path, by common convention. @@ -92,17 +94,19 @@ make_checksum_filename(DataDir, FileName) -> %% @doc Calculate a file data file path, by common convention. --spec make_data_filename(string(), string(), atom()|string()|binary(), integer()|string()) -> +-spec make_data_filename(string(), riak_dt:coc_namespace(), riak_dt:coc_locator(), string(), atom()|string()|binary(), integer()|string()) -> {binary(), string()}. -make_data_filename(DataDir, Prefix, SequencerName, FileNum) +make_data_filename(DataDir, CoC_Namespace, CoC_Locator, Prefix, SequencerName, FileNum) when is_integer(FileNum) -> - File = erlang:iolist_to_binary(io_lib:format("~s^~s^~w", - [Prefix, SequencerName, FileNum])), + Locator_str = int_to_hexstr(CoC_Locator, 32), + File = erlang:iolist_to_binary(io_lib:format("~s^~s^~s^~s^~w", + [Prefix, CoC_Namespace, Locator_str, SequencerName, FileNum])), make_data_filename2(DataDir, File); -make_data_filename(DataDir, Prefix, SequencerName, String) +make_data_filename(DataDir, CoC_Namespace, CoC_Locator, Prefix, SequencerName, String) when is_list(String) -> - File = erlang:iolist_to_binary(io_lib:format("~s^~s^~s", - [Prefix, SequencerName, string])), + Locator_str = int_to_hexstr(CoC_Locator, 32), + File = erlang:iolist_to_binary(io_lib:format("~s^~s^~s^~s^~s", + [Prefix, CoC_Namespace, Locator_str, SequencerName, string])), make_data_filename2(DataDir, File). make_data_filename2(DataDir, File) -> @@ -158,10 +162,10 @@ parse_filename(Filename) -> %% @doc Read the file size of a config file, which is used as the %% basis for a minimum sequence number. --spec read_max_filenum(string(), string()) -> +-spec read_max_filenum(string(), riak_dt:coc_namespace(), riak_dt:coc_locator(), string()) -> non_neg_integer(). -read_max_filenum(DataDir, Prefix) -> - case file:read_file_info(make_config_filename(DataDir, Prefix)) of +read_max_filenum(DataDir, CoC_Namespace, CoC_Locator, Prefix) -> + case file:read_file_info(make_config_filename(DataDir, CoC_Namespace, CoC_Locator, Prefix)) of {error, enoent} -> 0; {ok, FI} -> @@ -171,11 +175,11 @@ read_max_filenum(DataDir, Prefix) -> %% @doc Increase the file size of a config file, which is used as the %% basis for a minimum sequence number. --spec increment_max_filenum(string(), string()) -> +-spec increment_max_filenum(string(), riak_dt:coc_namespace(), riak_dt:coc_locator(), string()) -> ok | {error, term()}. -increment_max_filenum(DataDir, Prefix) -> +increment_max_filenum(DataDir, CoC_Namespace, CoC_Locator, Prefix) -> try - {ok, FH} = file:open(make_config_filename(DataDir, Prefix), [append]), + {ok, FH} = file:open(make_config_filename(DataDir, CoC_Namespace, CoC_Locator, Prefix), [append]), ok = file:write(FH, "x"), ok = file:sync(FH), ok = file:close(FH)