diff --git a/.gitignore b/.gitignore index ea5f7d3..38386d3 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,4 @@ _tdeps logs erln8.config .local_dialyzer_plt +deps/* diff --git a/.travis.yml b/.travis.yml index 054cd2a..393656f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,16 +1,14 @@ language: erlang otp_release: - - 18.0 + - 18.1 - 17.4 - - 17.2 - R16B03 - - R16B02 - - R16B01 - - R15B03 script: - make dialyzer - make xref - make - make test notifications: + email: christopher.meiklejohn@gmail.com email: tom@helium.com +sudo: false diff --git a/LICENSE b/LICENSE index d0a6050..031b433 100644 --- a/LICENSE +++ b/LICENSE @@ -1,3 +1,4 @@ +Copyright (c) 2015, Christopher Meiklejohn Copyright (c) 2015, Basho Technologies, Inc Copyright (c) 2015, Helium Systems, Inc All rights reserved. diff --git a/Makefile b/Makefile index 0b48c23..38a9b4a 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ -.PHONY: deps compile rel +REBAR = $(shell pwd)/rebar3 +.PHONY: deps compile rel test DIALYZER_APPS = kernel stdlib erts sasl eunit syntax_tools compiler crypto -REBAR="./rebar3" DEP_DIR="_build/lib" all: compile @@ -11,14 +11,16 @@ include tools.mk test: common_test common_test: - ./rebar3 ct + $(REBAR) ct -compile: deps - ./rebar3 compile +compile: + $(REBAR) compile rel: - ./rebar3 release + $(REBAR) release stage: - ./rebar3 release -d + $(REBAR) release -d +dialyzer: + $(REBAR) dialyzer diff --git a/README.md b/README.md index d9c2673..1243805 100644 --- a/README.md +++ b/README.md @@ -1,25 +1,13 @@ -plumtree -===== +Plumtree +======================================================= -Plumtree is an implementation of Plumtree[1], the epidemic broadcast protocol. -It is extracted from the implementation in riak_core[2]. Instead of -the riak_core ring and riak's ring gossip protocol, it includes a standalone -membership gossip, built around riak_dt[3]'s ORSWOT[4]. +[![Build Status](https://travis-ci.org/helium/plumtree.svg?branch=master)](https://travis-ci.org/helium/plumtree) -More information on the plumtree protocol and it's history we encourage you -to watch Jordan West's RICON West 2013 talk[5] and Joao Leitao & Jordan West's -RICON 2014 talk[6]. +Plumtree is an implementation of [Plumtree](http://www.gsd.inesc-id.pt/~jleitao/pdf/srds07-leitao.pdf), the epidemic broadcast protocol. It is extracted from the implementation in [Riak Core](https://github.com/basho/riak_core). Instead of the riak_core ring and riak's ring gossip protocol, it includes a standalone membership gossip, built around the [Riak DT](https://github.com/basho/riak_dt) [ORSWOT](http://haslab.uminho.pt/cbm/files/1210.3368v1.pdf). -A special thanks to Jordan, Joao and the team at Basho for providing much of -the code contained in this library. - -1. http://www.gsd.inesc-id.pt/~jleitao/pdf/srds07-leitao.pdf -2. https://github.com/basho/riak_core -3. https://github.com/basho/riak_dt -4. http://haslab.uminho.pt/cbm/files/1210.3368v1.pdf -5. https://www.youtube.com/watch?v=s4cCUTPU8GI -6. https://www.youtube.com/watch?v=bo367a6ZAwM +More information on the plumtree protocol and it's history we encourage you to watch Jordan West's [RICON West 2013 talk](https://www.youtube.com/watch?v=s4cCUTPU8GI) and Joao Leitao & Jordan West's [RICON 2014 talk](https://www.youtube.com/watch?v=bo367a6ZAwM). +A special thanks to Jordan, Joao and the team at Basho for providing much of the code contained in this library. Build ----- @@ -36,14 +24,12 @@ Testing Contributing ---- -Contributions from the community are encouraged. This project follows the -git-flow workflow. If you want to contribute: +Contributions from the community are encouraged. This project follows the git-flow workflow. If you want to contribute: -* Fork this repository +* Fork this repository * Make your changes and run the full test suite * Please include any additional tests for any additional code added * Commit your changes and push them to your fork * Open a pull request -We will review your changes, make appropriate suggestions and/or provide -feedback, and merge your changes when ready. +We will review your changes, make appropriate suggestions and/or provide feedback, and merge your changes when ready. diff --git a/include/plumtree.hrl b/include/plumtree.hrl new file mode 100644 index 0000000..2fe7fec --- /dev/null +++ b/include/plumtree.hrl @@ -0,0 +1 @@ +-define(SET, riak_dt_orswot). diff --git a/rebar.config b/rebar.config index a30845e..03db3d0 100644 --- a/rebar.config +++ b/rebar.config @@ -1,12 +1,17 @@ -{erl_opts, [warnings_as_errors, debug_info, {parse_transform, lager_transform}]}. {deps, [ - {lager, {git, "git://github.com/basho/lager.git", {tag, "2.1.1"}}}, - {riak_dt, {git, "git://github.com/basho/riak_dt.git", {tag, "2.1.0"}}}, - {eleveldb, {git, "git://github.com/helium/eleveldb.git", {branch, "adt-helium"}}} + {lasp_support, ".*", {git, "git://github.com/lasp-lang/lasp_support.git", {branch, "master"}}}, + {time_compat, ".*", {git, "git://github.com/lasp-lang/time_compat.git", {branch, "master"}}}, + {lager, ".*", {git, "git://github.com/basho/lager.git", {tag, "2.1.1"}}}, + {riak_dt, ".*", {git, "git://github.com/basho/riak_dt.git", {tag, "develop"}}}, + {eleveldb, ".*", {git, "git://github.com/lasp-lang/eleveldb.git", {branch, "develop"}}} ]}. {dialyzer_base_plt_apps, [kernel, stdlib, erts, sasl, eunit, syntax_tools, compiler, crypto]}. - {xref_checks, [undefined_function_calls]}. - +{erl_opts, [debug_info, + warnings_as_errors, + {platform_define, "^[0-9]+", namespaced_types}, + {parse_transform, lager_transform}]}. {cover_enabled, true}. +{eunit_opts, [verbose, {report,{eunit_surefire,[{dir,"."}]}}]}. +{edoc_opts, [{preprocess, true}]}. diff --git a/rebar.lock b/rebar.lock index 4c25d3a..5da8367 100644 --- a/rebar.lock +++ b/rebar.lock @@ -1,16 +1,36 @@ -[{<<"eleveldb">>, - {git,"git://github.com/helium/eleveldb.git", - {ref,"4e199ab1518060d348ae0f9719d2cbf9f098e00d"}}, - 0}, +[{<<"neotoma">>, + {git,"git://github.com/seancribbs/neotoma.git", + {ref,"6ac6007c7713712a80096de5c7cdf5b85b01374e"}}, + 2}, + {<<"getopt">>, + {git,"git://github.com/jcomellas/getopt.git", + {ref,"388dc95caa7fb97ec7db8cfc39246a36aba61bd8"}}, + 2}, {<<"goldrush">>, {git,"git://github.com/DeadZen/goldrush.git", {ref,"71e63212f12c25827e0c1b4198d37d5d018a7fec"}}, 1}, + {<<"cuttlefish">>, + {git,"git://github.com/basho/cuttlefish.git", + {ref,"68d54f6b7f0e1e1528fc6ae8555c424d12a31d68"}}, + 1}, + {<<"time_compat">>, + {git,"git://github.com/lasp-lang/time_compat.git", + {ref,"adfae4409187cc1a9f79028986e92e8730b5eda5"}}, + 0}, + {<<"riak_dt">>, + {git,"git://github.com/basho/riak_dt.git", + {ref,"a2986bccd1cc42facdfe739495c6d13762ae0f37"}}, + 0}, + {<<"lasp_support">>, + {git,"git://github.com/lasp-lang/lasp_support.git", + {ref,"f682f79801d3573db86bc55d849529a2a98edcfc"}}, + 0}, {<<"lager">>, {git,"git://github.com/basho/lager.git", {ref,"d33ccf3b69de09a628fe38b4d7981bb8671b8a4f"}}, 0}, - {<<"riak_dt">>, - {git,"git://github.com/basho/riak_dt.git", - {ref,"f7981d4ad7407ddc085f133f204dd71bf9d50c56"}}, + {<<"eleveldb">>, + {git,"git://github.com/lasp-lang/eleveldb.git", + {ref,"f33872c3c377015d1b282f956e98938528c901b0"}}, 0}]. diff --git a/src/app_helper.erl b/src/app_helper.erl deleted file mode 100644 index f90490b..0000000 --- a/src/app_helper.erl +++ /dev/null @@ -1,151 +0,0 @@ -%% ------------------------------------------------------------------- -%% -%% riak_core: Core Riak Application -%% -%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved. -%% -%% This file is provided to you under the Apache License, -%% Version 2.0 (the "License"); you may not use this file -%% except in compliance with the License. You may obtain -%% a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, -%% software distributed under the License is distributed on an -%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -%% KIND, either express or implied. See the License for the -%% specific language governing permissions and limitations -%% under the License. -%% -%% ------------------------------------------------------------------- - --module(app_helper). - --export([get_env/1, - get_env/2, - get_env/3, - get_prop_or_env/3, - get_prop_or_env/4, - try_envs/1, - try_envs/2]). - --ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). --endif. - -%% =================================================================== -%% Public API -%% =================================================================== - -%% @spec get_env(App :: atom()) -> [{Key :: atom(), Value :: term()}] -%% @doc Retrieve all Key/Value pairs in the env for the specified app. -get_env(App) -> - application:get_all_env(App). - -%% @spec get_env(App :: atom(), Key :: atom()) -> term() -%% @doc The official way to get a value from the app's env. -%% Will return the 'undefined' atom if that key is unset. -get_env(App, Key) -> - get_env(App, Key, undefined). - -%% @spec get_env(App :: atom(), Key :: atom(), Default :: term()) -> term() -%% @doc The official way to get a value from this application's env. -%% Will return Default if that key is unset. -get_env(App, Key, Default) -> - case application:get_env(App, Key) of - {ok, Value} -> - Value; - _ -> - Default - end. - -%% @doc Retrieve value for Key from Properties if it exists, otherwise -%% return from the application's env. --spec get_prop_or_env(atom(), [{atom(), term()}], atom()) -> term(). -get_prop_or_env(Key, Properties, App) -> - get_prop_or_env(Key, Properties, App, undefined). - -%% @doc Return the value for Key in Properties if it exists, otherwise return -%% the value from the application's env, or Default. --spec get_prop_or_env(atom(), [{atom(), term()}], atom(), term()) -> term(). -get_prop_or_env(Key, Properties, App, Default) -> - case proplists:get_value(Key, Properties) of - undefined -> - get_env(App, Key, Default); - Value -> - Value - end. - -%% @doc Like `get_env' but try multiple `{App, Key}' combos before -%% returning `{default, Default}'. The return value is `{App, -%% Key, Value}' so that the caller may distinguish where the -%% value came from. This is useful for scenarios when the config -%% app/key has changed between releases and you need to check for -%% both. --spec try_envs([{atom(), atom()}], term()) -> {atom(), atom(), term()}. -try_envs([{App, Key}|T], Default) -> - case get_env(App, Key) of - undefined -> - try_envs(T, Default); - Value -> - {App, Key, Value} - end; -try_envs([], Default) -> - {default, Default}. - -try_envs(Pairs) -> - try_envs(Pairs, undefined). - -%% =================================================================== -%% EUnit tests -%% =================================================================== --ifdef(TEST). - -app_helper_test_() -> - { setup, - fun setup/0, - fun cleanup/1, - [ - fun get_prop_or_env_default_value_test_case/0, - fun get_prop_or_env_undefined_value_test_case/0, - fun get_prop_or_env_from_env_test_case/0, - fun get_prop_or_env_from_prop_test_case/0, - fun get_prop_or_env_from_prop_with_default_test_case/0, - fun try_envs_test_case/0 - ] - }. - -setup() -> - application:set_env(bogus_app, envkeyone, value), - application:set_env(bogus_app, envkeytwo, valuetwo). - -cleanup(_Ctx) -> - ok. - -get_prop_or_env_default_value_test_case() -> - ?assertEqual(default, get_prop_or_env(key, [], bogus, default)). - -get_prop_or_env_undefined_value_test_case() -> - ?assertEqual(undefined, get_prop_or_env(key, [], bogus)). - -get_prop_or_env_from_env_test_case() -> - ?assertEqual(value, get_prop_or_env(envkeyone, [], bogus_app)). - -get_prop_or_env_from_prop_test_case() -> - Properties = [{envkeyone, propvalue}], - ?assertEqual(propvalue, get_prop_or_env(envkeyone, Properties, bogus_app)). - -get_prop_or_env_from_prop_with_default_test_case() -> - Properties = [{envkeyone, propvalue}], - ?assertEqual(propvalue, get_prop_or_env(envkeyone, Properties, bogus_app, default)). - -try_envs_test_case() -> - Val = try_envs([{noapp, nokey}, {bogus_app, envkeyone}], failed), - ?assertEqual({bogus_app, envkeyone, value}, Val), - Val2 = try_envs([{bogus_app, envkeytwo}, {noapp, nokey}], failed), - ?assertEqual({bogus_app, envkeytwo, valuetwo}, Val2), - Val3 = try_envs([{noapp, nokey}, {blah, blah}], default), - ?assertEqual({default, default}, Val3). - --endif. diff --git a/src/dvvset.erl b/src/dvvset.erl deleted file mode 100644 index dabb5f1..0000000 --- a/src/dvvset.erl +++ /dev/null @@ -1,460 +0,0 @@ -%%------------------------------------------------------------------- -%% -%% File: dvvset.erl -%% -%% @author Ricardo Tomé Gonçalves -%% @author Paulo Sérgio Almeida -% -%% @copyright The MIT License (MIT) -%% Copyright (C) 2013 -%% -%% Permission is hereby granted, free of charge, to any person obtaining a copy of this software and -%% associated documentation files (the "Software"), to deal in the Software without restriction, -%% including without limitation the rights to use, copy, modify, merge, publish, distribute, -%% sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is -%% furnished to do so, subject to the following conditions: -%% -%% The above copyright notice and this permission notice shall be included in all copies or -%% substantial portions of the Software. -%% -%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING -%% BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -%% NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -%% DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -%% -%% @doc -%% An Erlang implementation of *compact* Dotted Version Vectors, which -%% provides a container for a set of concurrent values (siblings) with causal -%% order information. -%% -%% For further reading, visit the -%% github page. -%% @end -%% -%% @reference -%% -%% Dotted Version Vectors: Logical Clocks for Optimistic Replication -%% -%% @end -%% -%%------------------------------------------------------------------- --module(dvvset). - --ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). --endif. - --export([new/1, - new/2, - sync/1, - join/1, - update/2, - update/3, - size/1, - ids/1, - values/1, - equal/2, - less/2, - map/2, - last/2, - lww/2, - reconcile/2 - ]). - --export_type([clock/0, vector/0, id/0, value/0]). - -% % @doc -%% STRUCTURE details: -%% * entries() are sorted by id() -%% * each counter() also includes the number of values in that id() -%% * the values in each triple of entries() are causally ordered and each new value goes to the head of the list - --type clock() :: {entries(), values()}. --type vector() :: [{id(), counter()}]. --type entries() :: [{id(), counter(), values()}]. --type id() :: any(). --type values() :: [value()]. --type value() :: any(). --type counter() :: non_neg_integer(). - - -%% @doc Constructs a new clock set without causal history, -%% and receives a list of values that gos to the anonymous list. --spec new(value() | [value()]) -> clock(). -new(Vs) when is_list(Vs) -> {[], Vs}; -new(V) -> {[], [V]}. - -%% @doc Constructs a new clock set with the causal history -%% of the given version vector / vector clock, -%% and receives a list of values that gos to the anonymous list. -%% The version vector SHOULD BE a direct result of join/1. --spec new(vector(), value() | [value()]) -> clock(). -new(VV, Vs) when is_list(Vs) -> - VVS = lists:sort(VV), % defense against non-order preserving serialization - {[{I, N, []} || {I, N} <- VVS], Vs}; -new(VV, V) -> new(VV, [V]). - -%% @doc Synchronizes a list of clocks using sync/2. -%% It discards (causally) outdated values, -%% while merging all causal histories. --spec sync([clock()]) -> clock(). -sync(L) -> lists:foldl(fun sync/2, {}, L). - -%% Private function --spec sync(clock(), clock()) -> clock(). -sync({}, C) -> C; -sync(C ,{}) -> C; -sync(C1={E1,V1},C2={E2,V2}) -> - V = case less(C1,C2) of - true -> V2; % C1 < C2 => return V2 - false -> case less(C2,C1) of - true -> V1; % C2 < C1 => return V1 - false -> % keep all unique anonymous values and sync entries() - sets:to_list(sets:from_list(V1++V2)) - end - end, - {sync2(E1,E2),V}. - -%% Private function --spec sync2(entries(), entries()) -> entries(). -sync2([], C) -> C; -sync2(C, []) -> C; -sync2([{I1, N1, L1}=H1 | T1]=C1, [{I2, N2, L2}=H2 | T2]=C2) -> - if - I1 < I2 -> [H1 | sync2(T1, C2)]; - I1 > I2 -> [H2 | sync2(T2, C1)]; - true -> [merge(I1, N1, L1, N2, L2) | sync2(T1, T2)] - end. - -%% Private function --spec merge(id(), counter(), values(), counter(), values()) -> {id(), counter(), values()}. -merge(I, N1, L1, N2, L2) -> - LL1 = length(L1), - LL2 = length(L2), - case N1 >= N2 of - true -> - case N1 - LL1 >= N2 - LL2 of - true -> {I, N1, L1}; - false -> {I, N1, lists:sublist(L1, N1 - N2 + LL2)} - end; - false -> - case N2 - LL2 >= N1 - LL1 of - true -> {I, N2, L2}; - false -> {I, N2, lists:sublist(L2, N2 - N1 + LL1)} - end - end. - - -%% @doc Return a version vector that represents the causal history. --spec join(clock()) -> vector(). -join({C,_}) -> [{I, N} || {I, N, _} <- C]. - -%% @doc Advances the causal history with the given id. -%% The new value is the *anonymous dot* of the clock. -%% The client clock SHOULD BE a direct result of new/2. --spec update(clock(), id()) -> clock(). -update({C,[V]}, I) -> {event(C, I, V), []}. - -%% @doc Advances the causal history of the -%% first clock with the given id, while synchronizing -%% with the second clock, thus the new clock is -%% causally newer than both clocks in the argument. -%% The new value is the *anonymous dot* of the clock. -%% The first clock SHOULD BE a direct result of new/2, -%% which is intended to be the client clock with -%% the new value in the *anonymous dot* while -%% the second clock is from the local server. --spec update(clock(), clock(), id()) -> clock(). -update({Cc,[V]}, Cr, I) -> - %% Sync both clocks without the new value - {C,Vs} = sync({Cc,[]}, Cr), - %% We create a new event on the synced causal history, - %% with the id I and the new value. - %% The anonymous values that were synced still remain. - {event(C, I, V), Vs}. - -%% Private function --spec event(entries(), id(), value()) -> entries(). -event([], I, V) -> [{I, 1, [V]}]; -event([{I, N, L} | T], I, V) -> [{I, N+1, [V | L]} | T]; -event([{I1, _, _} | _]=C, I, V) when I1 > I -> [{I, 1, [V]} | C]; -event([H | T], I, V) -> [H | event(T, I, V)]. - -%% @doc Returns the total number of values in this clock set. --spec size(clock()) -> non_neg_integer(). -size({C,Vs}) -> lists:sum([length(L) || {_,_,L} <- C]) + length(Vs). - -%% @doc Returns all the ids used in this clock set. --spec ids(clock()) -> [id()]. -ids({C,_}) -> ([I || {I,_,_} <- C]). - -%% @doc Returns all the values used in this clock set, -%% including the anonymous values. --spec values(clock()) -> [value()]. -values({C,Vs}) -> Vs ++ lists:append([L || {_,_,L} <- C]). - -%% @doc Compares the equality of both clocks, regarding -%% only the causal histories, thus ignoring the values. --spec equal(clock() | vector(), clock() | vector()) -> boolean(). -equal({C1,_},{C2,_}) -> equal2(C1,C2); % DVVSet -equal(C1,C2) when is_list(C1) and is_list(C2) -> equal2(C1,C2). %vector clocks - -%% Private function --spec equal2(vector(), vector()) -> boolean(). -equal2([], []) -> true; -equal2([{I, C, L1} | T1], [{I, C, L2} | T2]) - when length(L1) =:= length(L2) -> - equal2(T1, T2); -equal2(_, _) -> false. - -%% @doc Returns True if the first clock is causally older than -%% the second clock, thus values on the first clock are outdated. -%% Returns False otherwise. --spec less(clock(), clock()) -> boolean(). -less({C1,_}, {C2,_}) -> greater(C2, C1, false). - -%% Private function --spec greater(vector(), vector(), boolean()) -> boolean(). -greater([], [], Strict) -> Strict; -greater([_|_], [], _) -> true; -greater([], [_|_], _) -> false; -greater([{I, N1, _} | T1], [{I, N2, _} | T2], Strict) -> - if - N1 == N2 -> greater(T1, T2, Strict); - N1 > N2 -> greater(T1, T2, true); - N1 < N2 -> false - end; -greater([{I1, _, _} | T1], [{I2, _, _} | _]=C2, _) when I1 < I2 -> greater(T1, C2, true); -greater(_, _, _) -> false. - -%% @doc Maps (applies) a function on all values in this clock set, -%% returning the same clock set with the updated values. --spec map(fun((value()) -> value()), clock()) -> clock(). -map(F, {C,Vs}) -> - {[ {I, N, lists:map(F, V)} || {I, N, V} <- C], lists:map(F, Vs)}. - - -%% @doc Return a clock with the same causal history, but with only one -%% value in the anonymous placeholder. This value is the result of -%% the function F, which takes all values and returns a single new value. --spec reconcile(Winner::fun(([value()]) -> value()), clock()) -> clock(). -reconcile(F, C) -> - V = F(values(C)), - new(join(C),[V]). - -%% @doc Returns the latest value in the clock set, -%% according to function F(A,B), which returns *true* if -%% A compares less than or equal to B, false otherwise. --spec last(LessOrEqual::fun((value(),value()) -> boolean()), clock()) -> value(). -last(F, C) -> - {_ ,_ , V2} = find_entry(F, C), - V2. - -%% @doc Return a clock with the same causal history, but with only one -%% value in its original position. This value is the newest value -%% in the given clock, according to function F(A,B), which returns *true* -%% if A compares less than or equal to B, false otherwise. --spec lww(LessOrEqual::fun((value(),value()) -> boolean()), clock()) -> clock(). -lww(F, C={E,_}) -> - case find_entry(F, C) of - {id, I, V} -> {join_and_replace(I, V, E),[]}; - {anonym, _, V} -> new(join(C),[V]) - end. - -%% find_entry/2 - Private function -find_entry(F, {[], [V|T]}) -> find_entry(F, null, V, {[],T}, anonym); -find_entry(F, {[{_, _, []} | T], Vs}) -> find_entry(F, {T,Vs}); -find_entry(F, {[{I, _, [V|_]} | T], Vs}) -> find_entry(F, I, V, {T,Vs}, id). - -%% find_entry/5 - Private function -find_entry(F, I, V, C, Flag) -> - Fun = fun (A,B) -> - case F(A,B) of - false -> {left,A}; % A is newer than B - true -> {right,B} % A is older than B - end - end, - find_entry2(Fun, I, V, C, Flag). - -%% find_entry2/5 - Private function -find_entry2(_, I, V, {[], []}, anonym) -> {anonym, I , V}; -find_entry2(_, I, V, {[], []}, id) -> {id, I, V}; -find_entry2(F, I, V, {[], [V1 | T]}, Flag) -> - case F(V, V1) of - {left,V2} -> find_entry2(F, I, V2, {[],T}, Flag); - {right,V2} -> find_entry2(F, I, V2, {[],T}, anonym) - end; -find_entry2(F, I, V, {[{_, _, []} | T], Vs}, Flag) -> find_entry2(F, I, V, {T, Vs}, Flag); -find_entry2(F, I, V, {[{I1, _, [V1|_]} | T], Vs}, Flag) -> - case F(V, V1) of - {left,V2} -> find_entry2(F, I, V2, {T, Vs}, Flag); - {right,V2} -> find_entry2(F, I1, V2, {T, Vs}, Flag) - end. - -%% Private function -join_and_replace(Ir, V, C) -> - [if - I == Ir -> {I, N, [V]}; - true -> {I, N, []} - end - || {I, N, _} <- C]. - - -%% =================================================================== -%% EUnit tests -%% =================================================================== --ifdef(TEST). - - -join_test() -> - A = new([v1]), - A1 = update(A,a), - B = new(join(A1),[v2]), - B1 = update(B, A1, b), - ?assertEqual( join(A) , [] ), - ?assertEqual( join(A1) , [{a,1}] ), - ?assertEqual( join(B1) , [{a,1},{b,1}] ), - ok. - -update_test() -> - A0 = update(new([v1]),a), - A1 = update(new(join(A0),[v2]), A0, a), - A2 = update(new(join(A1),[v3]), A1, b), - A3 = update(new(join(A0),[v4]), A1, b), - A4 = update(new(join(A0),[v5]), A1, a), - ?assertEqual( A0 , {[{a,1,[v1]}],[]} ), - ?assertEqual( A1 , {[{a,2,[v2]}],[]} ), - ?assertEqual( A2 , {[{a,2,[]}, {b,1,[v3]}],[]} ), - ?assertEqual( A3 , {[{a,2,[v2]}, {b,1,[v4]}],[]} ), - ?assertEqual( A4 , {[{a,3,[v5,v2]}],[]} ), - ok. - -sync_test() -> - X = {[{x,1,[]}],[]}, - A = update(new([v1]),a), - Y = update(new([v2]),b), - A1 = update(new(join(A),[v2]), a), - A3 = update(new(join(A1),[v3]), b), - A4 = update(new(join(A1),[v3]), c), - F = fun (L,R) -> L>R end, - W = {[{a,1,[]}],[]}, - Z = {[{a,2,[v2,v1]}],[]}, - ?assertEqual( sync([W,Z]) , {[{a,2,[v2]}],[]} ), - ?assertEqual( sync([W,Z]) , sync([Z,W]) ), - ?assertEqual( sync([A,A1]) , sync([A1,A]) ), - ?assertEqual( sync([A4,A3]) , sync([A3,A4]) ), - ?assertEqual( sync([A4,A3]) , {[{a,2,[]}, {b,1,[v3]}, {c,1,[v3]}],[]} ), - ?assertEqual( sync([X,A]) , {[{a,1,[v1]},{x,1,[]}],[]} ), - ?assertEqual( sync([X,A]) , sync([A,X]) ), - ?assertEqual( sync([X,A]) , sync([A,X]) ), - ?assertEqual( sync([A,Y]) , {[{a,1,[v1]},{b,1,[v2]}],[]} ), - ?assertEqual( sync([Y,A]) , sync([A,Y]) ), - ?assertEqual( sync([Y,A]) , sync([A,Y]) ), - ?assertEqual( sync([A,X]) , sync([X,A]) ), - ?assertEqual( lww(F,A4) , sync([A4,lww(F,A4)]) ), - ok. - -syn_update_test() -> - A0 = update(new([v1]), a), % Mary writes v1 w/o VV - VV1 = join(A0), % Peter reads v1 with version vector (VV) - A1 = update(new([v2]), A0, a), % Mary writes v2 w/o VV - A2 = update(new(VV1,[v3]), A1, a), % Peter writes v3 with VV from v1 - ?assertEqual( VV1 , [{a,1}] ), - ?assertEqual( A0 , {[{a,1,[v1]}],[]} ), - ?assertEqual( A1 , {[{a,2,[v2,v1]}],[]} ), - %% now A2 should only have v2 and v3, since v3 was causally newer than v1 - ?assertEqual( A2 , {[{a,3,[v3,v2]}],[]} ), - ok. - -event_test() -> - {A,_} = update(new([v1]),a), - ?assertEqual( event(A,a,v2) , [{a,2,[v2,v1]}] ), - ?assertEqual( event(A,b,v2) , [{a,1,[v1]}, {b,1,[v2]}] ), - ok. - -lww_last_test() -> - F = fun (A,B) -> A =< B end, - F2 = fun ({_,TS1}, {_,TS2}) -> TS1 =< TS2 end, - X = {[{a,4,[5,2]},{b,1,[]},{c,1,[3]}],[]}, - Y = {[{a,4,[5,2]},{b,1,[]},{c,1,[3]}],[10,0]}, - Z = {[{a,4,[5,2]}, {b,1,[1]}], [3]}, - A = {[{a,4,[{5, 1002345}, {7, 1002340}]}, {b,1,[{4, 1001340}]}], [{2, 1001140}]}, - ?assertEqual( last(F,X) , 5 ), - ?assertEqual( last(F,Y) , 10 ), - ?assertEqual( lww(F,X) , {[{a,4,[5]},{b,1,[]},{c,1,[]}],[]} ), - ?assertEqual( lww(F,Y) , {[{a,4,[]},{b,1,[]},{c,1,[]}],[10]} ), - ?assertEqual( lww(F,Z) , {[{a,4,[5]},{b,1,[]}],[]} ), - ?assertEqual( lww(F2,A) , {[{a,4,[{5, 1002345}]}, {b,1,[]}], []} ), - ok. - -reconcile_test() -> - F1 = fun (L) -> lists:sum(L) end, - F2 = fun (L) -> hd(lists:sort(L)) end, - X = {[{a,4,[5,2]},{b,1,[]},{c,1,[3]}],[]}, - Y = {[{a,4,[5,2]},{b,1,[]},{c,1,[3]}],[10,0]}, - ?assertEqual( reconcile(F1,X) , {[{a,4,[]},{b,1,[]},{c,1,[]}],[10]} ), - ?assertEqual( reconcile(F1,Y) , {[{a,4,[]},{b,1,[]},{c,1,[]}],[20]} ), - ?assertEqual( reconcile(F2,X) , {[{a,4,[]},{b,1,[]},{c,1,[]}],[2]} ), - ?assertEqual( reconcile(F2,Y) , {[{a,4,[]},{b,1,[]},{c,1,[]}],[0]} ), - ok. - -less_test() -> - A = update(new(v1),[a]), - B = update(new(join(A),[v2]), a), - B2 = update(new(join(A),[v2]), b), - B3 = update(new(join(A),[v2]), z), - C = update(new(join(B),[v3]), A, c), - D = update(new(join(C),[v4]), B2, d), - ?assert( less(A,B) ), - ?assert( less(A,C) ), - ?assert( less(B,C) ), - ?assert( less(B,D) ), - ?assert( less(B2,D) ), - ?assert( less(A,D) ), - ?assertNot( less(B2,C) ), - ?assertNot( less(B,B2) ), - ?assertNot( less(B2,B) ), - ?assertNot( less(A,A) ), - ?assertNot( less(C,C) ), - ?assertNot( less(D,B2) ), - ?assertNot( less(B3,D) ), - ok. - -equal_test() -> - A = {[{a,4,[v5,v0]},{b,0,[]},{c,1,[v3]}], [v0]}, - B = {[{a,4,[v555,v0]}, {b,0,[]}, {c,1,[v3]}], []}, - C = {[{a,4,[v5,v0]},{b,0,[]}], [v6,v1]}, - % compare only the causal history - ?assert( equal(A,B) ), - ?assert( equal(B,A) ), - ?assertNot( equal(A,C) ), - ?assertNot( equal(B,C) ), - ok. - -size_test() -> - ?assertEqual( 1 , ?MODULE:size(new([v1])) ), - ?assertEqual( 5 , ?MODULE:size({[{a,4,[v5,v0]},{b,0,[]},{c,1,[v3]}],[v4,v1]}) ), - ok. - -ids_values_test() -> - A = {[{a,4,[v0,v5]},{b,0,[]},{c,1,[v3]}], [v1]}, - B = {[{a,4,[v0,v555]}, {b,0,[]}, {c,1,[v3]}], []}, - C = {[{a,4,[]},{b,0,[]}], [v1,v6]}, - ?assertEqual( ids(A) , [a,b,c] ), - ?assertEqual( ids(B) , [a,b,c] ), - ?assertEqual( ids(C) , [a,b] ), - ?assertEqual( lists:sort(values(A)) , [v0,v1,v3,v5] ), - ?assertEqual( lists:sort(values(B)) , [v0,v3,v555] ), - ?assertEqual( lists:sort(values(C)) , [v1,v6] ), - ok. - -map_test() -> - A = {[{a,4,[]},{b,0,[]},{c,1,[]}],[10]}, - B = {[{a,4,[5,0]},{b,0,[]},{c,1,[2]}],[20,10]}, - F = fun (X) -> X*2 end, - ?assertEqual( map(F,A) , {[{a,4,[]},{b,0,[]},{c,1,[]}],[20]} ), - ?assertEqual( map(F,B) , {[{a,4,[10,0]},{b,0,[]},{c,1,[4]}],[40,20]} ), - ok. - --endif. diff --git a/src/hashtree.erl b/src/hashtree.erl deleted file mode 100644 index f6da129..0000000 --- a/src/hashtree.erl +++ /dev/null @@ -1,1235 +0,0 @@ -%% ------------------------------------------------------------------- -%% -%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved. -%% -%% This file is provided to you under the Apache License, -%% Version 2.0 (the "License"); you may not use this file -%% except in compliance with the License. You may obtain -%% a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, -%% software distributed under the License is distributed on an -%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -%% KIND, either express or implied. See the License for the -%% specific language governing permissions and limitations -%% under the License. -%% -%% ------------------------------------------------------------------- - -%% @doc -%% This module implements a persistent, on-disk hash tree that is used -%% predominately for active anti-entropy exchange in Riak. The tree consists -%% of two parts, a set of unbounded on-disk segments and a fixed size hash -%% tree (that may be on-disk or in-memory) constructed over these segments. -%% -%% A graphical description of this design can be found in: docs/hashtree.md -%% -%% Each segment logically represents an on-disk list of (key, hash) pairs. -%% Whereas the hash tree is represented as a set of levels and buckets, with a -%% fixed width (or fan-out) between levels that determines how many buckets of -%% a child level are grouped together and hashed to represent a bucket at the -%% parent level. Each leaf in the tree corresponds to a hash of one of the -%% on-disk segments. For example, a tree with a width of 4 and 16 segments -%% would look like the following: -%% -%% level buckets -%% 1: [0] -%% 2: [0 1 2 3] -%% 3: [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15] -%% -%% With each bucket entry of the form ``{bucket-id, hash}'', eg. ``{0, -%% binary()}''. The hash for each of the entries at level 3 would come from -%% one of the 16 segments, while the hashes for entries at level 1 and 2 are -%% derived from the lower levels. -%% -%% Specifically, the bucket entries in level 2 would come from level 3: -%% 0: hash([ 0 1 2 3]) -%% 1: hash([ 4 5 6 7]) -%% 2: hash([ 8 9 10 11]) -%% 3: hash([12 13 14 15]) -%% -%% And the bucket entries in level 1 would come from level 2: -%% 1: hash([hash([ 0 1 2 3]) -%% hash([ 4 5 6 7]) -%% hash([ 8 9 10 11]) -%% hash([12 13 14 15])]) -%% -%% When a (key, hash) pair is added to the tree, the key is hashed to -%% determine which segment it belongs to and inserted/upserted into the -%% segment. Rather than update the hash tree on every insert, a dirty bit is -%% set to note that a given segment has changed. The hashes are then updated -%% in bulk before performing a tree exchange -%% -%% To update the hash tree, the code iterates over each dirty segment, -%% building a list of (key, hash) pairs. A hash is computed over this list, -%% and the leaf node in the hash tree corresponding to the given segment is -%% updated. After iterating over all dirty segments, and thus updating all -%% leaf nodes, the update then continues to update the tree bottom-up, -%% updating only paths that have changed. As designed, the update requires a -%% single sparse scan over the on-disk segments and a minimal traversal up the -%% hash tree. -%% -%% The heavy-lifting of this module is provided by LevelDB. What is logically -%% viewed as sorted on-disk segments is in reality a range of on-disk -%% (segment, key, hash) values written to LevelDB. Each insert of a (key, -%% hash) pair therefore corresponds to a single LevelDB write (no read -%% necessary). Likewise, the update operation is performed using LevelDB -%% iterators. -%% -%% When used for active anti-entropy in Riak, the hash tree is built once and -%% then updated in real-time as writes occur. A key design goal is to ensure -%% that adding (key, hash) pairs to the tree is non-blocking, even during a -%% tree update or a tree exchange. This is accomplished using LevelDB -%% snapshots. Inserts into the tree always write directly to the active -%% LevelDB instance, however updates and exchanges operate over a snapshot of -%% the tree. -%% -%% In order to improve performance, writes are buffered in memory and sent -%% to LevelDB using a single batch write. Writes are flushed whenever the -%% buffer becomes full, as well as before updating the hashtree. -%% -%% Tree exchange is provided by the ``compare/4'' function. -%% The behavior of this function is determined through a provided function -%% that implements logic to get buckets and segments for a given remote tree, -%% as well as a callback invoked as key differences are determined. This -%% generic interface allows for tree exchange to be implemented in a variety -%% of ways, including directly against to local hash tree instances, over -%% distributed Erlang, or over a custom protocol over a TCP socket. See -%% ``local_compare/2'' and ``do_remote/1'' for examples (-ifdef(TEST) only). - --module(hashtree). - --export([new/0, - new/2, - new/3, - insert/3, - insert/4, - delete/2, - update_tree/1, - update_snapshot/1, - update_perform/1, - rehash_tree/1, - flush_buffer/1, - close/1, - destroy/1, - read_meta/2, - write_meta/3, - compare/4, - top_hash/1, - get_bucket/3, - key_hashes/2, - levels/1, - segments/1, - width/1, - mem_levels/1]). - --ifdef(TEST). --export([local_compare/2]). --export([run_local/0, - run_local/1, - run_concurrent_build/0, - run_concurrent_build/1, - run_concurrent_build/2, - run_multiple/2, - run_remote/0, - run_remote/1]). --endif. % TEST - --ifdef(EQC). --export([prop_correct/0]). --include_lib("eqc/include/eqc.hrl"). --endif. - --ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). --endif. - --define(NUM_SEGMENTS, (1024*1024)). --define(WIDTH, 1024). --define(MEM_LEVELS, 0). - --type tree_id_bin() :: <<_:176>>. --type segment_bin() :: <<_:256, _:_*8>>. --type bucket_bin() :: <<_:320>>. --type meta_bin() :: <<_:8, _:_*8>>. - --type proplist() :: proplists:proplist(). --type orddict() :: orddict:orddict(). --type index() :: non_neg_integer(). - --type keydiff() :: {missing | remote_missing | different, binary()}. - --type remote_fun() :: fun((get_bucket | key_hashes | init | final, - {integer(), integer()} | integer() | term()) -> any()). - --type acc_fun(Acc) :: fun(([keydiff()], Acc) -> Acc). - --type select_fun(T) :: fun((orddict()) -> T). - --record(state, {id :: tree_id_bin(), - index :: index(), - levels :: pos_integer(), - segments :: pos_integer(), - width :: pos_integer(), - mem_levels :: integer(), - tree :: dict:dict(), - ref :: term(), - path :: string(), - itr :: term(), - write_buffer :: [{put, binary(), binary()} | {delete, binary()}], - write_buffer_count :: integer(), - dirty_segments :: array:array() - }). - --record(itr_state, {itr :: term(), - id :: tree_id_bin(), - current_segment :: '*' | integer(), - remaining_segments :: ['*' | integer()], - acc_fun :: fun(([{binary(),binary()}]) -> any()), - segment_acc :: [{binary(), binary()}], - final_acc :: [{integer(), any()}], - prefetch=false :: boolean() - }). - --opaque hashtree() :: #state{}. --export_type([hashtree/0, - tree_id_bin/0, - keydiff/0, - remote_fun/0, - acc_fun/1]). - -%%%=================================================================== -%%% API -%%%=================================================================== - --spec new() -> hashtree(). -new() -> - new({0,0}). - --spec new({index(), tree_id_bin() | non_neg_integer()}) -> hashtree(). -new(TreeId) -> - State = new_segment_store([], #state{}), - new(TreeId, State, []). - --spec new({index(), tree_id_bin() | non_neg_integer()}, proplist()) -> hashtree(); - ({index(), tree_id_bin() | non_neg_integer()}, hashtree()) -> hashtree(). -new(TreeId, Options) when is_list(Options) -> - State = new_segment_store(Options, #state{}), - new(TreeId, State, Options); -new(TreeId, LinkedStore = #state{}) -> - new(TreeId, LinkedStore, []). - --spec new({index(), tree_id_bin() | non_neg_integer()}, - hashtree(), - proplist()) -> hashtree(). -new({Index,TreeId}, LinkedStore, Options) -> - NumSegments = proplists:get_value(segments, Options, ?NUM_SEGMENTS), - Width = proplists:get_value(width, Options, ?WIDTH), - MemLevels = proplists:get_value(mem_levels, Options, ?MEM_LEVELS), - NumLevels = erlang:trunc(math:log(NumSegments) / math:log(Width)) + 1, - State = #state{id=encode_id(TreeId), - index=Index, - levels=NumLevels, - segments=NumSegments, - width=Width, - mem_levels=MemLevels, - %% dirty_segments=gb_sets:new(), - dirty_segments=bitarray_new(NumSegments), - write_buffer=[], - write_buffer_count=0, - tree=dict:new()}, - State2 = share_segment_store(State, LinkedStore), - State2. - --spec close(hashtree()) -> hashtree(). -close(State) -> - close_iterator(State#state.itr), - catch eleveldb:close(State#state.ref), - State#state{itr=undefined}. - -close_iterator(Itr) -> - try - eleveldb:iterator_close(Itr) - catch - _:_ -> - ok - end. - --spec destroy(string() | hashtree()) -> ok | hashtree(). -destroy(Path) when is_list(Path) -> - ok = eleveldb:destroy(Path, []); -destroy(State) -> - %% Assumption: close was already called on all hashtrees that - %% use this LevelDB instance, - ok = eleveldb:destroy(State#state.path, []), - State. - --spec insert(binary(), binary(), hashtree()) -> hashtree(). -insert(Key, ObjHash, State) -> - insert(Key, ObjHash, State, []). - --spec insert(binary(), binary(), hashtree(), proplist()) -> hashtree(). -insert(Key, ObjHash, State, Opts) -> - Hash = erlang:phash2(Key), - Segment = Hash rem State#state.segments, - HKey = encode(State#state.id, Segment, Key), - case should_insert(HKey, Opts, State) of - true -> - State2 = enqueue_action({put, HKey, ObjHash}, State), - %% Dirty = gb_sets:add_element(Segment, State2#state.dirty_segments), - Dirty = bitarray_set(Segment, State2#state.dirty_segments), - State2#state{dirty_segments=Dirty}; - false -> - State - end. - -enqueue_action(Action, State) -> - WBuffer = [Action|State#state.write_buffer], - WCount = State#state.write_buffer_count + 1, - State2 = State#state{write_buffer=WBuffer, - write_buffer_count=WCount}, - State3 = maybe_flush_buffer(State2), - State3. - -maybe_flush_buffer(State=#state{write_buffer_count=WCount}) -> - Threshold = 200, - case WCount > Threshold of - true -> - flush_buffer(State); - false -> - State - end. - -flush_buffer(State=#state{write_buffer=WBuffer}) -> - %% Write buffer is built backwards, reverse to build update list - Updates = lists:reverse(WBuffer), - ok = eleveldb:write(State#state.ref, Updates, []), - State#state{write_buffer=[], - write_buffer_count=0}. - --spec delete(binary(), hashtree()) -> hashtree(). -delete(Key, State) -> - Hash = erlang:phash2(Key), - Segment = Hash rem State#state.segments, - HKey = encode(State#state.id, Segment, Key), - State2 = enqueue_action({delete, HKey}, State), - %% Dirty = gb_sets:add_element(Segment, State2#state.dirty_segments), - Dirty = bitarray_set(Segment, State2#state.dirty_segments), - State2#state{dirty_segments=Dirty}. - --spec should_insert(segment_bin(), proplist(), hashtree()) -> boolean(). -should_insert(HKey, Opts, State) -> - IfMissing = proplists:get_value(if_missing, Opts, false), - case IfMissing of - true -> - %% Only insert if object does not already exist - %% TODO: Use bloom filter so we don't always call get here - case eleveldb:get(State#state.ref, HKey, []) of - not_found -> - true; - _ -> - false - end; - _ -> - true - end. - --spec update_snapshot(hashtree()) -> {hashtree(), hashtree()}. -update_snapshot(State=#state{segments=NumSegments}) -> - State2 = flush_buffer(State), - SnapState = snapshot(State2), - State3 = SnapState#state{dirty_segments=bitarray_new(NumSegments)}, - {SnapState, State3}. - --spec update_tree(hashtree()) -> hashtree(). -update_tree(State) -> - State2 = flush_buffer(State), - State3 = snapshot(State2), - update_perform(State3). - --spec update_perform(hashtree()) -> hashtree(). -update_perform(State2=#state{dirty_segments=Dirty, segments=NumSegments}) -> - %% Segments = gb_sets:to_list(Dirty), - Segments = bitarray_to_list(Dirty), - State3 = update_tree(Segments, State2), - %% State3#state{dirty_segments=gb_sets:new()}. - State3#state{dirty_segments=bitarray_new(NumSegments)}. - --spec update_tree([integer()], hashtree()) -> hashtree(). -update_tree([], State) -> - State; -update_tree(Segments, State) -> - Hashes = orddict:from_list(hashes(State, Segments)), - Groups = group(Hashes, State#state.width), - LastLevel = State#state.levels, - NewState = update_levels(LastLevel, Groups, State), - NewState. - --spec rehash_tree(hashtree()) -> hashtree(). -rehash_tree(State) -> - State2 = snapshot(State), - rehash_perform(State2). - --spec rehash_perform(hashtree()) -> hashtree(). -rehash_perform(State) -> - Hashes = orddict:from_list(hashes(State, ['*', '*'])), - case Hashes of - [] -> - State; - _ -> - Groups = group(Hashes, State#state.width), - LastLevel = State#state.levels, - NewState = update_levels(LastLevel, Groups, State), - NewState - end. - --spec top_hash(hashtree()) -> [] | [{0, binary()}]. -top_hash(State) -> - get_bucket(1, 0, State). - -compare(Tree, Remote, AccFun, Acc) -> - compare(1, 0, Tree, Remote, AccFun, Acc). - --spec levels(hashtree()) -> pos_integer(). -levels(#state{levels=L}) -> - L. - --spec segments(hashtree()) -> pos_integer(). -segments(#state{segments=S}) -> - S. - --spec width(hashtree()) -> pos_integer(). -width(#state{width=W}) -> - W. - --spec mem_levels(hashtree()) -> integer(). -mem_levels(#state{mem_levels=M}) -> - M. - -%% Note: meta is currently a one per file thing, even if there are multiple -%% trees per file. This is intentional. If we want per tree metadata -%% this will need to be added as a separate thing. --spec write_meta(binary(), binary(), hashtree()) -> hashtree(). -write_meta(Key, Value, State) when is_binary(Key) and is_binary(Value) -> - HKey = encode_meta(Key), - ok = eleveldb:put(State#state.ref, HKey, Value, []), - State. - --spec read_meta(binary(), hashtree()) -> {ok, binary()} | undefined. -read_meta(Key, State) when is_binary(Key) -> - HKey = encode_meta(Key), - case eleveldb:get(State#state.ref, HKey, []) of - {ok, Value} -> - {ok, Value}; - _ -> - undefined - end. - --spec key_hashes(hashtree(), integer()) -> [{integer(), orddict()}]. -key_hashes(State, Segment) -> - multi_select_segment(State, [Segment], fun(X) -> X end). - --spec get_bucket(integer(), integer(), hashtree()) -> orddict(). -get_bucket(Level, Bucket, State) -> - case Level =< State#state.mem_levels of - true -> - get_memory_bucket(Level, Bucket, State); - false -> - get_disk_bucket(Level, Bucket, State) - end. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== - --ifndef(old_hash). -md5(Bin) -> - crypto:hash(md5, Bin). - --ifdef(TEST). -esha(Bin) -> - crypto:hash(sha, Bin). --endif. - -esha_init() -> - crypto:hash_init(sha). - -esha_update(Ctx, Bin) -> - crypto:hash_update(Ctx, Bin). - -esha_final(Ctx) -> - crypto:hash_final(Ctx). --else. -md5(Bin) -> - crypto:md5(Bin). - --ifdef(TEST). -esha(Bin) -> - crypto:sha(Bin). --endif. - -esha_init() -> - crypto:sha_init(). - -esha_update(Ctx, Bin) -> - crypto:sha_update(Ctx, Bin). - -esha_final(Ctx) -> - crypto:sha_final(Ctx). --endif. - --spec set_bucket(integer(), integer(), any(), hashtree()) -> hashtree(). -set_bucket(Level, Bucket, Val, State) -> - case Level =< State#state.mem_levels of - true -> - set_memory_bucket(Level, Bucket, Val, State); - false -> - set_disk_bucket(Level, Bucket, Val, State) - end. - --spec new_segment_store(proplist(), hashtree()) -> hashtree(). -new_segment_store(Opts, State) -> - DataDir = case proplists:get_value(segment_path, Opts) of - undefined -> - Root = "/tmp/anti/level", - <> = md5(term_to_binary({erlang:now(), make_ref()})), - filename:join(Root, integer_to_list(P)); - SegmentPath -> - SegmentPath - end, - - DefaultWriteBufferMin = 4 * 1024 * 1024, - DefaultWriteBufferMax = 14 * 1024 * 1024, - ConfigVars = get_env(anti_entropy_leveldb_opts, - [{write_buffer_size_min, DefaultWriteBufferMin}, - {write_buffer_size_max, DefaultWriteBufferMax}]), - Config = orddict:from_list(ConfigVars), - - %% Use a variable write buffer size to prevent against all buffers being - %% flushed to disk at once when under a heavy uniform load. - WriteBufferMin = proplists:get_value(write_buffer_size_min, Config, DefaultWriteBufferMin), - WriteBufferMax = proplists:get_value(write_buffer_size_max, Config, DefaultWriteBufferMax), - {Offset, _} = random:uniform_s(1 + WriteBufferMax - WriteBufferMin, now()), - WriteBufferSize = WriteBufferMin + Offset, - Config2 = orddict:store(write_buffer_size, WriteBufferSize, Config), - Config3 = orddict:erase(write_buffer_size_min, Config2), - Config4 = orddict:erase(write_buffer_size_max, Config3), - Config5 = orddict:store(is_internal_db, true, Config4), - Config6 = orddict:store(use_bloomfilter, true, Config5), - Options = orddict:store(create_if_missing, true, Config6), - - ok = filelib:ensure_dir(DataDir), - {ok, Ref} = eleveldb:open(DataDir, Options), - State#state{ref=Ref, path=DataDir}. - --spec share_segment_store(hashtree(), hashtree()) -> hashtree(). -share_segment_store(State, #state{ref=Ref, path=Path}) -> - State#state{ref=Ref, path=Path}. - --spec hash(term()) -> binary(). -hash(X) -> - %% erlang:phash2(X). - sha(term_to_binary(X)). - -sha(Bin) -> - Chunk = get_env(anti_entropy_sha_chunk, 4096), - sha(Chunk, Bin). - -sha(Chunk, Bin) -> - Ctx1 = esha_init(), - Ctx2 = sha(Chunk, Bin, Ctx1), - SHA = esha_final(Ctx2), - SHA. - -sha(Chunk, Bin, Ctx) -> - case Bin of - <> -> - Ctx2 = esha_update(Ctx, Data), - sha(Chunk, Rest, Ctx2); - Data -> - Ctx2 = esha_update(Ctx, Data), - Ctx2 - end. - -get_env(Key, Default) -> - CoreEnv = app_helper:get_env(plumtree, Key, Default), - app_helper:get_env(riak_kv, Key, CoreEnv). - --spec update_levels(integer(), - [{integer(), [{integer(), binary()}]}], - hashtree()) -> hashtree(). -update_levels(0, _, State) -> - State; -update_levels(Level, Groups, State) -> - {NewState, NewBuckets} = - lists:foldl(fun({Bucket, NewHashes}, {StateAcc, BucketsAcc}) -> - Hashes1 = get_bucket(Level, Bucket, StateAcc), - Hashes2 = orddict:from_list(NewHashes), - Hashes3 = orddict:merge(fun(_, _, New) -> New end, - Hashes1, - Hashes2), - StateAcc2 = set_bucket(Level, Bucket, Hashes3, StateAcc), - NewBucket = {Bucket, hash(Hashes3)}, - {StateAcc2, [NewBucket | BucketsAcc]} - end, {State, []}, Groups), - Groups2 = group(NewBuckets, State#state.width), - update_levels(Level - 1, Groups2, NewState). - -%% Takes a list of bucket-hash entries from level X and groups them together -%% into groups representing entries at parent level X-1. -%% -%% For example, given bucket-hash entries at level X: -%% [{1,H1}, {2,H2}, {3,H3}, {4,H4}, {5,H5}, {6,H6}, {7,H7}, {8,H8}] -%% -%% The grouping at level X-1 with a width of 4 would be: -%% [{1,[{1,H1}, {2,H2}, {3,H3}, {4,H4}]}, -%% {2,[{5,H5}, {6,H6}, {7,H7}, {8,H8}]}] -%% --spec group([{integer(), binary()}], pos_integer()) - -> [{integer(), [{integer(), binary()}]}]. -group(L, Width) -> - {FirstId, _} = hd(L), - FirstBucket = FirstId div Width, - {LastBucket, LastGroup, Groups} = - lists:foldl(fun(X={Id, _}, {LastBucket, Acc, Groups}) -> - Bucket = Id div Width, - case Bucket of - LastBucket -> - {LastBucket, [X|Acc], Groups}; - _ -> - {Bucket, [X], [{LastBucket, Acc} | Groups]} - end - end, {FirstBucket, [], []}, L), - [{LastBucket, LastGroup} | Groups]. - --spec get_memory_bucket(integer(), integer(), hashtree()) -> any(). -get_memory_bucket(Level, Bucket, #state{tree=Tree}) -> - case dict:find({Level, Bucket}, Tree) of - error -> - orddict:new(); - {ok, Val} -> - Val - end. - --spec set_memory_bucket(integer(), integer(), any(), hashtree()) -> hashtree(). -set_memory_bucket(Level, Bucket, Val, State) -> - Tree = dict:store({Level, Bucket}, Val, State#state.tree), - State#state{tree=Tree}. - --spec get_disk_bucket(integer(), integer(), hashtree()) -> any(). -get_disk_bucket(Level, Bucket, #state{id=Id, ref=Ref}) -> - HKey = encode_bucket(Id, Level, Bucket), - case eleveldb:get(Ref, HKey, []) of - {ok, Bin} -> - binary_to_term(Bin); - _ -> - orddict:new() - end. - --spec set_disk_bucket(integer(), integer(), any(), hashtree()) -> hashtree(). -set_disk_bucket(Level, Bucket, Val, State=#state{id=Id, ref=Ref}) -> - HKey = encode_bucket(Id, Level, Bucket), - Bin = term_to_binary(Val), - ok = eleveldb:put(Ref, HKey, Bin, []), - State. - --spec encode_id(binary() | non_neg_integer()) -> tree_id_bin(). -encode_id(TreeId) when is_integer(TreeId) -> - if (TreeId >= 0) andalso - (TreeId < ((1 bsl 160)-1)) -> - <>; - true -> - erlang:error(badarg) - end; -encode_id(TreeId) when is_binary(TreeId) and (byte_size(TreeId) == 22) -> - TreeId; -encode_id(_) -> - erlang:error(badarg). - --spec encode(tree_id_bin(), integer(), binary()) -> segment_bin(). -encode(TreeId, Segment, Key) -> - <<$t,TreeId:22/binary,$s,Segment:64/integer,Key/binary>>. - --spec safe_decode(binary()) -> {tree_id_bin() | bad, integer(), binary()}. -safe_decode(Bin) -> - case Bin of - <<$t,TreeId:22/binary,$s,Segment:64/integer,Key/binary>> -> - {TreeId, Segment, Key}; - _ -> - {bad, -1, <<>>} - end. - --spec decode(segment_bin()) -> {tree_id_bin(), non_neg_integer(), binary()}. -decode(Bin) -> - <<$t,TreeId:22/binary,$s,Segment:64/integer,Key/binary>> = Bin, - {TreeId, Segment, Key}. - --spec encode_bucket(tree_id_bin(), integer(), integer()) -> bucket_bin(). -encode_bucket(TreeId, Level, Bucket) -> - <<$b,TreeId:22/binary,$b,Level:64/integer,Bucket:64/integer>>. - --spec encode_meta(binary()) -> meta_bin(). -encode_meta(Key) -> - <<$m,Key/binary>>. - --spec hashes(hashtree(), list('*'|integer())) -> [{integer(), binary()}]. -hashes(State, Segments) -> - multi_select_segment(State, Segments, fun hash/1). - --spec snapshot(hashtree()) -> hashtree(). -snapshot(State) -> - %% Abuse eleveldb iterators as snapshots - catch eleveldb:iterator_close(State#state.itr), - {ok, Itr} = eleveldb:iterator(State#state.ref, []), - State#state{itr=Itr}. - --spec multi_select_segment(hashtree(), list('*'|integer()), select_fun(T)) - -> [{integer(), T}]. -multi_select_segment(#state{id=Id, itr=Itr}, Segments, F) -> - [First | Rest] = Segments, - IS1 = #itr_state{itr=Itr, - id=Id, - current_segment=First, - remaining_segments=Rest, - acc_fun=F, - segment_acc=[], - final_acc=[]}, - Seek = case First of - '*' -> - encode(Id, 0, <<>>); - _ -> - encode(Id, First, <<>>) - end, - IS2 = iterate(iterator_move(Itr, Seek), IS1), - #itr_state{current_segment=LastSegment, - segment_acc=LastAcc, - final_acc=FA} = IS2, - Result = [{LastSegment, F(LastAcc)} | FA], - case Result of - [{'*', _}] -> - %% Handle wildcard select when all segments are empty - []; - _ -> - Result - end. - -iterator_move(undefined, _Seek) -> - {error, invalid_iterator}; -iterator_move(Itr, Seek) -> - try - - eleveldb:iterator_move(Itr, Seek) - catch - _:badarg -> - {error, invalid_iterator} - end. - --spec iterate({'error','invalid_iterator'} | {'ok',binary(),binary()}, - #itr_state{}) -> #itr_state{}. -iterate({error, invalid_iterator}, IS=#itr_state{}) -> - IS; -iterate({ok, K, V}, IS=#itr_state{itr=Itr, - id=Id, - current_segment=CurSeg, - remaining_segments=Segments, - acc_fun=F, - segment_acc=Acc, - final_acc=FinalAcc}) -> - {SegId, Seg, _} = safe_decode(K), - Segment = case CurSeg of - '*' -> - Seg; - _ -> - CurSeg - end, - case {SegId, Seg, Segments, IS#itr_state.prefetch} of - {bad, -1, _, _} -> - %% Non-segment encountered, end traversal - IS; - {Id, Segment, _, _} -> - %% Still reading existing segment - IS2 = IS#itr_state{current_segment=Segment, - segment_acc=[{K,V} | Acc], - prefetch=true}, - iterate(iterator_move(Itr, prefetch), IS2); - {Id, _, [Seg|Remaining], _} -> - %% Pointing at next segment we are interested in - IS2 = IS#itr_state{current_segment=Seg, - remaining_segments=Remaining, - segment_acc=[{K,V}], - final_acc=[{Segment, F(Acc)} | FinalAcc], - prefetch=true}, - iterate(iterator_move(Itr, prefetch), IS2); - {Id, _, ['*'], _} -> - %% Pointing at next segment we are interested in - IS2 = IS#itr_state{current_segment=Seg, - remaining_segments=['*'], - segment_acc=[{K,V}], - final_acc=[{Segment, F(Acc)} | FinalAcc], - prefetch=true}, - iterate(iterator_move(Itr, prefetch), IS2); - {Id, NextSeg, [NextSeg|Remaining], _} -> - %% A previous prefetch_stop left us at the start of the - %% next interesting segment. - IS2 = IS#itr_state{current_segment=NextSeg, - remaining_segments=Remaining, - segment_acc=[{K,V}], - prefetch=true}, - iterate(iterator_move(Itr, prefetch), IS2); - {Id, _, [_NextSeg | _Remaining], true} -> - %% Pointing at uninteresting segment, but need to halt the - %% prefetch to ensure the interator can be reused - IS2 = IS#itr_state{segment_acc=[], - final_acc=[{Segment, F(Acc)} | FinalAcc], - prefetch=false}, - iterate(iterator_move(Itr, prefetch_stop), IS2); - {Id, _, [NextSeg | Remaining], false} -> - %% Pointing at uninteresting segment, seek to next interesting one - Seek = encode(Id, NextSeg, <<>>), - IS2 = IS#itr_state{current_segment=NextSeg, - remaining_segments=Remaining, - segment_acc=[], - final_acc=[{Segment, F(Acc)} | FinalAcc]}, - iterate(iterator_move(Itr, Seek), IS2); - {_, _, _, true} -> - %% Done with traversal, but need to stop the prefetch to - %% ensure the iterator can be reused. The next operation - %% with this iterator is a seek so no need to be concerned - %% with the data returned here. - _ = iterator_move(Itr, prefetch_stop), - IS#itr_state{prefetch=false}; - {_, _, _, false} -> - %% Done with traversal - IS - end. - --spec compare(integer(), integer(), hashtree(), remote_fun(), acc_fun(X), X) -> X. -compare(Level, Bucket, Tree, Remote, AccFun, KeyAcc) when Level == Tree#state.levels+1 -> - Keys = compare_segments(Bucket, Tree, Remote), - AccFun(Keys, KeyAcc); -compare(Level, Bucket, Tree, Remote, AccFun, KeyAcc) -> - HL1 = get_bucket(Level, Bucket, Tree), - HL2 = Remote(get_bucket, {Level, Bucket}), - Union = lists:ukeysort(1, HL1 ++ HL2), - Inter = ordsets:intersection(ordsets:from_list(HL1), - ordsets:from_list(HL2)), - Diff = ordsets:subtract(Union, Inter), - KeyAcc3 = - lists:foldl(fun({Bucket2, _}, KeyAcc2) -> - compare(Level+1, Bucket2, Tree, Remote, AccFun, KeyAcc2) - end, KeyAcc, Diff), - KeyAcc3. - --spec compare_segments(integer(), hashtree(), remote_fun()) -> [keydiff()]. -compare_segments(Segment, Tree=#state{id=Id}, Remote) -> - [{_, KeyHashes1}] = key_hashes(Tree, Segment), - KeyHashes2 = Remote(key_hashes, Segment), - HL1 = orddict:from_list(KeyHashes1), - HL2 = orddict:from_list(KeyHashes2), - Delta = orddict_delta(HL1, HL2), - Keys = [begin - {Id, Segment, Key} = decode(KBin), - Type = key_diff_type(Diff), - {Type, Key} - end || {KBin, Diff} <- Delta], - Keys. - -key_diff_type({'$none', _}) -> - missing; -key_diff_type({_, '$none'}) -> - remote_missing; -key_diff_type(_) -> - different. - -orddict_delta(D1, D2) -> - orddict_delta(D1, D2, []). - -orddict_delta([{K1,V1}|D1], [{K2,_}=E2|D2], Acc) when K1 < K2 -> - Acc2 = [{K1,{V1,'$none'}} | Acc], - orddict_delta(D1, [E2|D2], Acc2); -orddict_delta([{K1,_}=E1|D1], [{K2,V2}|D2], Acc) when K1 > K2 -> - Acc2 = [{K2,{'$none',V2}} | Acc], - orddict_delta([E1|D1], D2, Acc2); -orddict_delta([{K1,V1}|D1], [{_K2,V2}|D2], Acc) -> %K1 == K2 - case V1 of - V2 -> - orddict_delta(D1, D2, Acc); - _ -> - Acc2 = [{K1,{V1,V2}} | Acc], - orddict_delta(D1, D2, Acc2) - end; -orddict_delta([], D2, Acc) -> - L = [{K2,{'$none',V2}} || {K2,V2} <- D2], - L ++ Acc; -orddict_delta(D1, [], Acc) -> - L = [{K1,{V1,'$none'}} || {K1,V1} <- D1], - L ++ Acc. - -%%%=================================================================== -%%% bitarray -%%%=================================================================== --define(W, 27). - --spec bitarray_new(integer()) -> array:array(). -bitarray_new(N) -> array:new((N-1) div ?W + 1, {default, 0}). - --spec bitarray_set(integer(), array:array()) -> array:array(). -bitarray_set(I, A) -> - AI = I div ?W, - V = array:get(AI, A), - V1 = V bor (1 bsl (I rem ?W)), - array:set(AI, V1, A). - --spec bitarray_to_list(array:array()) -> [integer()]. -bitarray_to_list(A) -> - lists:reverse( - array:sparse_foldl(fun(I, V, Acc) -> - expand(V, I * ?W, Acc) - end, [], A)). - -%% Convert bit vector into list of integers, with optional offset. -%% expand(2#01, 0, []) -> [0] -%% expand(2#10, 0, []) -> [1] -%% expand(2#1101, 0, []) -> [3,2,0] -%% expand(2#1101, 1, []) -> [4,3,1] -%% expand(2#1101, 10, []) -> [13,12,10] -%% expand(2#1101, 100, []) -> [103,102,100] -expand(0, _, Acc) -> - Acc; -expand(V, N, Acc) -> - Acc2 = - case (V band 1) of - 1 -> - [N|Acc]; - 0 -> - Acc - end, - expand(V bsr 1, N+1, Acc2). - -%%%=================================================================== -%%% Experiments -%%%=================================================================== - --ifdef(TEST). - -run_local() -> - run_local(10000). -run_local(N) -> - timer:tc(fun do_local/1, [N]). - -run_concurrent_build() -> - run_concurrent_build(10000). -run_concurrent_build(N) -> - run_concurrent_build(N, N). -run_concurrent_build(N1, N2) -> - timer:tc(fun do_concurrent_build/2, [N1, N2]). - -run_multiple(Count, N) -> - Tasks = [fun() -> - do_concurrent_build(N, N) - end || _ <- lists:seq(1, Count)], - timer:tc(fun peval/1, [Tasks]). - -run_remote() -> - run_remote(100000). -run_remote(N) -> - timer:tc(fun do_remote/1, [N]). - -do_local(N) -> - A0 = insert_many(N, new()), - A1 = insert(<<"10">>, <<"42">>, A0), - A2 = insert(<<"10">>, <<"42">>, A1), - A3 = insert(<<"13">>, <<"52">>, A2), - - B0 = insert_many(N, new()), - B1 = insert(<<"14">>, <<"52">>, B0), - B2 = insert(<<"10">>, <<"32">>, B1), - B3 = insert(<<"10">>, <<"422">>, B2), - - A4 = update_tree(A3), - B4 = update_tree(B3), - KeyDiff = local_compare(A4, B4), - lager:info("KeyDiff: ~p~n", [KeyDiff]), - close(A4), - close(B4), - destroy(A4), - destroy(B4), - ok. - -do_concurrent_build(N1, N2) -> - F1 = fun() -> - A0 = insert_many(N1, new()), - A1 = insert(<<"10">>, <<"42">>, A0), - A2 = insert(<<"10">>, <<"42">>, A1), - A3 = insert(<<"13">>, <<"52">>, A2), - A4 = update_tree(A3), - A4 - end, - - F2 = fun() -> - B0 = insert_many(N2, new()), - B1 = insert(<<"14">>, <<"52">>, B0), - B2 = insert(<<"10">>, <<"32">>, B1), - B3 = insert(<<"10">>, <<"422">>, B2), - B4 = update_tree(B3), - B4 - end, - - [A4, B4] = peval([F1, F2]), - KeyDiff = local_compare(A4, B4), - lager:info("KeyDiff: ~p~n", [KeyDiff]), - - close(A4), - close(B4), - destroy(A4), - destroy(B4), - ok. - -do_remote(N) -> - %% Spawn new process for remote tree - Other = - spawn(fun() -> - A0 = insert_many(N, new()), - A1 = insert(<<"10">>, <<"42">>, A0), - A2 = insert(<<"10">>, <<"42">>, A1), - A3 = insert(<<"13">>, <<"52">>, A2), - A4 = update_tree(A3), - message_loop(A4, 0, 0) - end), - - %% Build local tree - B0 = insert_many(N, new()), - B1 = insert(<<"14">>, <<"52">>, B0), - B2 = insert(<<"10">>, <<"32">>, B1), - B3 = insert(<<"10">>, <<"422">>, B2), - B4 = update_tree(B3), - - %% Compare with remote tree through message passing - Remote = fun(get_bucket, {L, B}) -> - Other ! {get_bucket, self(), L, B}, - receive {remote, X} -> X end; - (key_hashes, Segment) -> - Other ! {key_hashes, self(), Segment}, - receive {remote, X} -> X end - end, - KeyDiff = compare(B4, Remote), - lager:info("KeyDiff: ~p~n", [KeyDiff]), - - %% Signal spawned process to print stats and exit - Other ! done, - ok. - -message_loop(Tree, Msgs, Bytes) -> - receive - {get_bucket, From, L, B} -> - Reply = get_bucket(L, B, Tree), - From ! {remote, Reply}, - Size = byte_size(term_to_binary(Reply)), - message_loop(Tree, Msgs+1, Bytes+Size); - {key_hashes, From, Segment} -> - [{_, KeyHashes2}] = key_hashes(Tree, Segment), - Reply = KeyHashes2, - From ! {remote, Reply}, - Size = byte_size(term_to_binary(Reply)), - message_loop(Tree, Msgs+1, Bytes+Size); - done -> - lager:info("Exchanged messages: ~b~n", [Msgs]), - lager:info("Exchanged bytes: ~b~n", [Bytes]), - ok - end. - -insert_many(N, T1) -> - T2 = - lists:foldl(fun(X, TX) -> - insert(bin(-X), bin(X*100), TX) - end, T1, lists:seq(1,N)), - T2. - -bin(X) -> - list_to_binary(integer_to_list(X)). - -peval(L) -> - Parent = self(), - lists:foldl( - fun(F, N) -> - spawn(fun() -> - Parent ! {peval, N, F()} - end), - N+1 - end, 0, L), - L2 = [receive {peval, N, R} -> {N,R} end || _ <- L], - {_, L3} = lists:unzip(lists:keysort(1, L2)), - L3. - -%%%=================================================================== -%%% EUnit -%%%=================================================================== - --spec local_compare(hashtree(), hashtree()) -> [keydiff()]. -local_compare(T1, T2) -> - Remote = fun(get_bucket, {L, B}) -> - get_bucket(L, B, T2); - (key_hashes, Segment) -> - [{_, KeyHashes2}] = key_hashes(T2, Segment), - KeyHashes2 - end, - compare(T1, Remote). - --spec compare(hashtree(), remote_fun()) -> [keydiff()]. -compare(Tree, Remote) -> - compare(Tree, Remote, fun(Keys, KeyAcc) -> - Keys ++ KeyAcc - end). - --spec compare(hashtree(), remote_fun(), acc_fun(X)) -> X. -compare(Tree, Remote, AccFun) -> - compare(Tree, Remote, AccFun, []). - -%% Verify that `update_tree/1' generates a snapshot of the underlying -%% LevelDB store that is used by `compare', therefore isolating the -%% compare from newer/concurrent insertions into the tree. -snapshot_test() -> - A0 = insert(<<"10">>, <<"42">>, new()), - B0 = insert(<<"10">>, <<"52">>, new()), - A1 = update_tree(A0), - B1 = update_tree(B0), - B2 = insert(<<"10">>, <<"42">>, B1), - KeyDiff = local_compare(A1, B1), - close(A1), - close(B2), - destroy(A1), - destroy(B2), - ?assertEqual([{different, <<"10">>}], KeyDiff), - ok. - -delta_test() -> - T1 = update_tree(insert(<<"1">>, esha(term_to_binary(make_ref())), - new())), - T2 = update_tree(insert(<<"2">>, esha(term_to_binary(make_ref())), - new())), - Diff = local_compare(T1, T2), - ?assertEqual([{remote_missing, <<"1">>}, {missing, <<"2">>}], Diff), - Diff2 = local_compare(T2, T1), - ?assertEqual([{missing, <<"1">>}, {remote_missing, <<"2">>}], Diff2), - ok. --endif. - -%%%=================================================================== -%%% EQC -%%%=================================================================== - --ifdef(EQC). -sha_test_() -> - {spawn, - {timeout, 120, - fun() -> - ?assert(eqc:quickcheck(eqc:testing_time(4, prop_sha()))) - end - }}. - -prop_sha() -> - %% NOTE: Generating 1MB (1024 * 1024) size binaries is incredibly slow - %% with EQC and was using over 2GB of memory - ?FORALL({Size, NumChunks}, {choose(1, 1024), choose(1, 16)}, - ?FORALL(Bin, binary(Size), - begin - %% we need at least one chunk, - %% and then we divide the binary size - %% into the number of chunks (as a natural - %% number) - ChunkSize = max(1, (Size div NumChunks)), - sha(ChunkSize, Bin) =:= esha(Bin) - end)). - -eqc_test_() -> - {spawn, - {timeout, 120, - fun() -> - ?assert(eqc:quickcheck(eqc:testing_time(4, prop_correct()))) - end - }}. - -objects() -> - ?SIZED(Size, objects(Size+3)). - -objects(N) -> - ?LET(Keys, shuffle(lists:seq(1,N)), - [{bin(K), binary(8)} || K <- Keys] - ). - -lengths(N) -> - ?LET(MissingN1, choose(0,N), - ?LET(MissingN2, choose(0,N-MissingN1), - ?LET(DifferentN, choose(0,N-MissingN1-MissingN2), - {MissingN1, MissingN2, DifferentN}))). - -mutate(Binary) -> - L1 = binary_to_list(Binary), - [X|Xs] = L1, - X2 = (X+1) rem 256, - L2 = [X2|Xs], - list_to_binary(L2). - -prop_correct() -> - ?FORALL(Objects, objects(), - ?FORALL({MissingN1, MissingN2, DifferentN}, lengths(length(Objects)), - begin - {RemoteOnly, Objects2} = lists:split(MissingN1, Objects), - {LocalOnly, Objects3} = lists:split(MissingN2, Objects2), - {Different, Same} = lists:split(DifferentN, Objects3), - - Different2 = [{Key, mutate(Hash)} || {Key, Hash} <- Different], - - Insert = fun(Tree, Vals) -> - lists:foldl(fun({Key, Hash}, Acc) -> - insert(Key, Hash, Acc) - end, Tree, Vals) - end, - - A0 = new(), - B0 = new(), - - [begin - A1 = new({0,Id}, A0), - B1 = new({0,Id}, B0), - - A2 = Insert(A1, Same), - A3 = Insert(A2, LocalOnly), - A4 = Insert(A3, Different), - - B2 = Insert(B1, Same), - B3 = Insert(B2, RemoteOnly), - B4 = Insert(B3, Different2), - - A5 = update_tree(A4), - B5 = update_tree(B4), - - Expected = - [{missing, Key} || {Key, _} <- RemoteOnly] ++ - [{remote_missing, Key} || {Key, _} <- LocalOnly] ++ - [{different, Key} || {Key, _} <- Different], - - KeyDiff = local_compare(A5, B5), - - ?assertEqual(lists:usort(Expected), - lists:usort(KeyDiff)), - - %% Reconcile trees - A6 = Insert(A5, RemoteOnly), - B6 = Insert(B5, LocalOnly), - B7 = Insert(B6, Different), - A7 = update_tree(A6), - B8 = update_tree(B7), - ?assertEqual([], local_compare(A7, B8)), - true - end || Id <- lists:seq(0, 10)], - close(A0), - close(B0), - destroy(A0), - destroy(B0), - true - end)). - --endif. diff --git a/src/hashtree_tree.erl b/src/hashtree_tree.erl deleted file mode 100644 index 4bbb7b4..0000000 --- a/src/hashtree_tree.erl +++ /dev/null @@ -1,565 +0,0 @@ -%% ------------------------------------------------------------------- -%% -%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. -%% -%% This file is provided to you under the Apache License, -%% Version 2.0 (the "License"); you may not use this file -%% except in compliance with the License. You may obtain -%% a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, -%% software distributed under the License is distributed on an -%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -%% KIND, either express or implied. See the License for the -%% specific language governing permissions and limitations -%% under the License. -%% -%% ------------------------------------------------------------------- - -%% @doc This module implements a specialized hash tree that is used -%% primarily by cluster metadata's anti-entropy exchanges and by -%% metadata clients for determining when groups of metadata keys have -%% changed locally. The tree can be used, generally, for determining -%% the differences in groups of keys, or to find missing groups, between -%% two stores. -%% -%% Each node of the tree is itself a hash tree, specifically a {@link -%% hashtree}. The tree has a fixed height but each node has a -%% variable amount of children. The height of the tree directly -%% corresponds to the number of prefixes supported by the tree. A list -%% of prefixes, or a "prefix list", represent a group of keys. Each -%% unique prefix list is a node in the tree. The leaves store hashes -%% for the individual keys in the segments of the node's {@link -%% hashtree}. The buckets of the leaves' hashtree provide an efficient -%% way of determining when keys in the segments differ between two -%% trees. The tails of the prefix list are used to roll up groups -%% into parent groups. For example, the prefixes `[a, b]', `[a, c]', -%% `[d, e]' will be rolled up into parent groups `a', containing `c' -%% and `b', and `d', containing only 'e'. The parent group's node has -%% children corresponding to each child group. The top-hashes of the -%% child nodes are stored in the parent nodes' segments. The parent -%% nodes' buckets are used as an efficient method for determining when -%% child groups differ between two trees. The root node corresponds to -%% the empty list and it acts like any other node, storing hashes for -%% the first level of child groups. The top hash of the root node is -%% the top hash of the tree. -%% -%% The tree in the example above might store something like: -%% -%% node parent top-hash segments -%% --------------------------------------------------- -%% root none 1 [{a, 2}, {d, 3}] -%% [a] root 2 [{b, 4}, {c, 5}] -%% [d] root 3 [{e, 6}] -%% [a,b] [a] 4 [{k1, 0}, {k2, 6}, ...] -%% [a,c] [a] 5 [{k1, 1}, {k2, 4}, ...] -%% [d,e] [d] 6 [{k1, 2}, {k2, 3}, ...] -%% -%% -%% When a key is inserted into the tree it is inserted into the leaf -%% corresponding to the given prefix list. The leaf and its parents -%% are not updated at this time. Instead the leaf is added to a dirty -%% set. The nodes are later updated in bulk. -%% -%% Updating the hashtree is a two step process. First, a snapshot of -%% the tree must be obtained. This prevents new writes from affecting -%% the update. Snapshotting the tree will snapshot each dirty -%% leaf. Since writes to nodes other than leaves only occur during -%% updates no snapshot is taken for them. Second, the tree is updated -%% using the snapshot. The update is performed by updating the {@link -%% hashtree} nodes at each level starting with the leaves. The top -%% hash of each node in a level is inserted into its parent node after -%% being updated. The list of dirty parents is then updated, moving up -%% the tree. Once the root is reached and has been updated the process -%% is complete. This process is designed to minimize the traversal of -%% the tree and ensure that each node is only updated once. -%% -%% The typical use for updating a tree is to compare it with another -%% recently updated tree. Comparison is done with the ``compare/4'' -%% function. Compare provides a sort of fold over the differences of -%% the tree allowing for callers to determine what to do with those -%% differences. In addition, the caller can accumulate a value, such -%% as the difference list or stats about differencces. -%% -%% The tree implemented in this module assumes that it will be managed -%% by a single process and that all calls will be made to it synchronously, with -%% a couple exceptions: -%% -%% 1. Updating a tree with a snapshot can be done in another process. The snapshot -%% must be taken by the owning process, synchronously. -%% 2. Comparing two trees may be done by a seperate process. Compares should should use -%% a snapshot and only be performed after an update. -%% -%% The nodes in this tree are backend by LevelDB, however, this is -%% most likely temporary and Cluster Metadata's use of the tree is -%% ephemeral. Trees are only meant to live for the lifetime of a -%% running node and are rebuilt on start. To ensure the tree is fresh -%% each time, when nodes are created the backing LevelDB store is -%% opened, closed, and then re-opened to ensure any lingering files -%% are removed. Additionally, the nodes themselves (references to -%% {@link hashtree}, are stored in {@link ets}. - - --module(hashtree_tree). - --export([new/2, - destroy/1, - insert/4, - insert/5, - update_snapshot/1, - update_perform/1, - local_compare/2, - compare/4, - top_hash/1, - prefix_hash/2, - get_bucket/4, - key_hashes/3]). - --export_type([tree/0, tree_node/0, handler_fun/1, remote_fun/0]). - --record(hashtree_tree, { - %% the identifier for this tree. used as part of the ids - %% passed to hashtree.erl and in keys used to store nodes in - %% the tree's ets tables. - id :: term(), - - %% directory where nodes are stored on disk - data_root :: file:name_all(), - - %% number of levels in the tree excluding leaves (height - 1) - num_levels :: non_neg_integer(), - - %% ets table that holds hashtree nodes in the tree - nodes :: ets:tab(), - - %% ets table that holds snapshot nodes - snapshot :: ets:tab(), - - %% set of dirty leaves - dirty :: gb_sets:set() - }). - --define(ROOT, '$ht_root'). --define(NUM_LEVELS, 2). - --opaque tree() :: #hashtree_tree{}. --type prefix() :: atom() | binary(). --type prefixes() :: [prefix()]. --opaque tree_node() :: prefixes() | ?ROOT. --type prefix_diff() :: {missing_prefix, local | remote, prefixes()}. --type key_diffs() :: {key_diffs, prefixes(),[{missing | - remote_missing | - different, binary()}]}. --type diff() :: prefix_diff() | key_diffs(). --type handler_fun(X) :: fun((diff(), X) -> X). --type remote_fun() :: fun((prefixes(), - {get_bucket, {integer(), integer()}} | - {key_hashses, integer()}) -> orddict:orddict()). - -%%%=================================================================== -%%% API -%%%=================================================================== - -%% @doc Creates a new hashtree. -%% -%% Takes the following options: -%% * num_levels - the height of the tree excluding leaves. corresponds to the -%% length of the prefix list passed to {@link insert/5}. -%% * data_dir - the directory where the LevelDB instances for the nodes will -%% be stored. --type new_opt_num_levels() :: {num_levels, non_neg_integer()}. --type new_opt_data_dir() :: {data_dir, file:name_all()}. --type new_opt() :: new_opt_num_levels() | new_opt_data_dir(). --type new_opts() :: [new_opt()]. --spec new(term(), new_opts()) -> tree(). -new(TreeId, Opts) -> - NumLevels = proplists:get_value(num_levels, Opts, ?NUM_LEVELS), - DataRoot = data_root(Opts), - Tree = #hashtree_tree{id = TreeId, - data_root = DataRoot, - num_levels = NumLevels, - %% table needs to be public to allow async update - nodes = ets:new(undefined, [public]), - snapshot = undefined, - dirty = gb_sets:new()}, - get_node(?ROOT, Tree), - Tree. - -%% @doc Destroys the tree cleaning up any used resources. -%% This deletes the LevelDB files for the nodes. --spec destroy(tree()) -> ok. -destroy(Tree) -> - ets:foldl(fun({_, Node}, _) -> - Node1 = hashtree:close(Node), - hashtree:destroy(Node1) - end, undefined, Tree#hashtree_tree.nodes), - catch ets:delete(Tree#hashtree_tree.nodes), - ok. - -%% @doc an alias for insert(Prefixes, Key, Hash, [], Tree) --spec insert(prefixes(), binary(), binary(), tree()) -> tree() | {error, term()}. -insert(Prefixes, Key, Hash, Tree) -> - insert(Prefixes, Key, Hash, [], Tree). - -%% @doc Insert a hash into the tree. The length of `Prefixes' must -%% correspond to the height of the tree -- the value used for -%% `num_levels' when creating the tree. The hash is inserted into -%% a leaf of the tree and that leaf is marked as dirty. The tree is not -%% updated at this time. Future operations on the tree should used the -%% tree returend by this fucntion. -%% -%% Insert takes the following options: -%% * if_missing - if `true' then the hash is only inserted into the tree -%% if the key is not already present. This is useful for -%% ensuring writes concurrent with building the tree -%% take precedence over older values. `false' is the default -%% value. --type insert_opt_if_missing() :: {if_missing, boolean()}. --type insert_opt() :: insert_opt_if_missing(). --type insert_opts() :: [insert_opt()]. --spec insert(prefixes(), binary(), binary(), insert_opts(), tree()) -> tree() | {error, term()}. -insert(Prefixes, Key, Hash, Opts, Tree) -> - NodeName = prefixes_to_node_name(Prefixes), - case valid_prefixes(NodeName, Tree) of - true -> - insert_hash(Key, Hash, Opts, NodeName, Tree); - false -> - {error, bad_prefixes} - end. - -%% @doc Snapshot the tree for updating. The return tree should be -%% updated using {@link update_perform/1} and to perform future operations -%% on the tree --spec update_snapshot(tree()) -> tree(). -update_snapshot(Tree=#hashtree_tree{dirty=Dirty,nodes=Nodes,snapshot=Snapshot0}) -> - catch ets:delete(Snapshot0), - FoldRes = gb_sets:fold(fun(DirtyName, Acc) -> - DirtyKey = node_key(DirtyName, Tree), - Node = lookup_node(DirtyName, Tree), - {DirtyNode, NewNode} = hashtree:update_snapshot(Node), - [{{DirtyKey, DirtyNode}, {DirtyKey, NewNode}} | Acc] - end, [], Dirty), - {Snaps, NewNodes} = lists:unzip(FoldRes), - Snapshot = ets:new(undefined, []), - ets:insert(Snapshot, Snaps), - ets:insert(Nodes, NewNodes), - Tree#hashtree_tree{dirty=gb_sets:new(),snapshot=Snapshot}. - - -%% @doc Update the tree with a snapshot obtained by {@link -%% update_snapshot/1}. This function may be called by a process other -%% than the one managing the tree. --spec update_perform(tree()) -> ok. -update_perform(Tree=#hashtree_tree{snapshot=Snapshot}) -> - DirtyParents = ets:foldl(fun(DirtyLeaf, DirtyParentsAcc) -> - update_dirty_leaves(DirtyLeaf, DirtyParentsAcc, Tree) - end, - gb_sets:new(), Snapshot), - update_dirty_parents(DirtyParents, Tree), - catch ets:delete(Snapshot), - ok. - -%% @doc Compare two local trees. This function is primarily for -%% local debugging and testing. --spec local_compare(tree(), tree()) -> [diff()]. -local_compare(T1, T2) -> - RemoteFun = fun(Prefixes, {get_bucket, {Level, Bucket}}) -> - hashtree_tree:get_bucket(Prefixes, Level, Bucket, T2); - (Prefixes, {key_hashes, Segment}) -> - [{_, Hashes}] = hashtree_tree:key_hashes(Prefixes, Segment, T2), - Hashes - end, - HandlerFun = fun(Diff, Acc) -> Acc ++ [Diff] end, - compare(T1, RemoteFun, HandlerFun, []). - -%% @doc Compare a local and remote tree. `RemoteFun' is used to -%% access the buckets and segments of nodes in the remote -%% tree. `HandlerFun' will be called for each difference found in the -%% tree. A difference is either a missing local or remote prefix, or a -%% list of key differences, which themselves signify different or -%% missing keys. `HandlerAcc' is passed to the first call of -%% `HandlerFun' and each subsequent call is passed the value returned -%% by the previous call. The return value of this function is the -%% return value from the last call to `HandlerFun'. --spec compare(tree(), remote_fun(), handler_fun(X), X) -> X. -compare(LocalTree, RemoteFun, HandlerFun, HandlerAcc) -> - compare(?ROOT, 1, LocalTree, RemoteFun, HandlerFun, HandlerAcc). - -%% @doc Returns the top-hash of the tree. This is the top-hash of the -%% root node. --spec top_hash(tree()) -> undefined | binary(). -top_hash(Tree) -> - prefix_hash([], Tree). - -%% @doc Returns the top-hash of the node corresponding to the given -%% prefix list. The length of the prefix list can be less than or -%% equal to the height of the tree. If the tree has not been updated -%% or if the prefix list is not found or invalid, then `undefined' is -%% returned. Otherwise the hash value from the most recent update is -%% returned. --spec prefix_hash(prefixes(), tree()) -> undefined | binary(). -prefix_hash(Prefixes, Tree) -> - NodeName = prefixes_to_node_name(Prefixes), - case lookup_node(NodeName, Tree) of - undefined -> undefined; - Node -> extract_top_hash(hashtree:top_hash(Node)) - end. - -%% @doc Returns the {@link hashtree} buckets for a given node in the -%% tree. This is used primarily for accessing buckets of a remote tree -%% during compare. --spec get_bucket(tree_node(), integer(), integer(), tree()) -> orddict:orddict(). -get_bucket(Prefixes, Level, Bucket, Tree) -> - case lookup_node(prefixes_to_node_name(Prefixes), Tree) of - undefined -> orddict:new(); - Node -> hashtree:get_bucket(Level, Bucket, Node) - end. - -%% @doc Returns the {@link hashtree} segment hashes for a given node -%% in the tree. This is used primarily for accessing key hashes of a -%% remote tree during compare. --spec key_hashes(tree_node(), integer(), tree()) -> [{integer(), orddict:orddict()}]. -key_hashes(Prefixes, Segment, Tree) -> - case lookup_node(prefixes_to_node_name(Prefixes), Tree) of - undefined -> [{Segment, orddict:new()}]; - Node -> hashtree:key_hashes(Node, Segment) - end. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== - -%% @private -insert_hash(Key, Hash, Opts, NodeName, Tree) -> - Node = get_node(NodeName, Tree), - insert_hash(Key, Hash, Opts, NodeName, Node, Tree). - -%% @private -insert_hash(Key, Hash, Opts, NodeName, Node, Tree=#hashtree_tree{dirty=Dirty}) -> - Node2 = hashtree:insert(Key, Hash, Node, Opts), - Dirty2 = gb_sets:add_element(NodeName, Dirty), - _ = set_node(NodeName, Node2, Tree), - Tree#hashtree_tree{dirty=Dirty2}. - -%% @private -update_dirty_leaves({DirtyKey, DirtyNode}, DirtyParents, Tree) -> - update_dirty(node_key_to_name(DirtyKey), DirtyNode, DirtyParents, Tree). - -%% @private -update_dirty_parents(DirtyParents, Tree) -> - case gb_sets:is_empty(DirtyParents) of - true -> ok; - false -> - NextDirty = gb_sets:fold( - fun(DirtyParent, DirtyAcc) -> - DirtyNode = lookup_node(DirtyParent, Tree), - {DirtySnap, DirtyNode2} = hashtree:update_snapshot(DirtyNode), - NextDirty = update_dirty(DirtyParent, DirtySnap, DirtyAcc, Tree), - _ = set_node(DirtyParent, DirtyNode2, Tree), - NextDirty - end, gb_sets:new(), DirtyParents), - update_dirty_parents(NextDirty, Tree) - end. - -%% @private -update_dirty(DirtyName, DirtyNode, NextDirty, Tree) -> - %% ignore returned tree b/c we are tracking dirty nodes in this fold seperately - _ = hashtree:update_perform(DirtyNode), - case parent_node(DirtyName, Tree) of - undefined -> - NextDirty; - {ParentName, ParentNode} -> - TopHash = extract_top_hash(hashtree:top_hash(DirtyNode)), - ParentKey = to_parent_key(DirtyName), - %% ignore returned tree b/c we are tracking dirty nodes in this fold seperately - _ = insert_hash(ParentKey, TopHash, [], ParentName, ParentNode, Tree), - gb_sets:add_element(ParentName, NextDirty) - end. - -%% @private -compare(NodeName, Level, LocalTree, RemoteFun, HandlerFun, HandlerAcc) - when Level =:= LocalTree#hashtree_tree.num_levels + 1 -> - Prefixes = node_name_to_prefixes(NodeName), - LocalNode = lookup_node(NodeName, LocalTree), - RemoteNode = fun(Action, Info) -> - RemoteFun(Prefixes, {Action, Info}) - end, - AccFun = fun(Diffs, CompareAcc) -> - Res = HandlerFun({key_diffs, Prefixes, Diffs}, - extract_compare_acc(CompareAcc, HandlerAcc)), - [{acc, Res}] - end, - CompareRes = hashtree:compare(LocalNode, RemoteNode, AccFun, []), - extract_compare_acc(CompareRes, HandlerAcc); -compare(NodeName, Level, LocalTree, RemoteFun, HandlerFun, HandlerAcc) -> - Prefixes = node_name_to_prefixes(NodeName), - LocalNode = lookup_node(NodeName, LocalTree), - RemoteNode = fun(Action, Info) -> - RemoteFun(Prefixes, {Action, Info}) - end, - AccFoldFun = fun({missing, NodeKey}, HandlerAcc2) -> - missing_prefix(NodeKey, local, HandlerFun, HandlerAcc2); - ({remote_missing, NodeKey}, HandlerAcc2) -> - missing_prefix(NodeKey, remote, HandlerFun, HandlerAcc2); - ({different, NodeKey}, HandlerAcc2) -> - compare(from_parent_key(NodeKey), Level+1, LocalTree, - RemoteFun, HandlerFun, HandlerAcc2) - end, - AccFun = fun(Diffs, CompareAcc) -> - Res = lists:foldl(AccFoldFun, - extract_compare_acc(CompareAcc, HandlerAcc), Diffs), - [{acc, Res}] - end, - CompareRes = hashtree:compare(LocalNode, RemoteNode, AccFun, []), - extract_compare_acc(CompareRes, HandlerAcc). - - -%% @private -missing_prefix(NodeKey, Type, HandlerFun, HandlerAcc) -> - HandlerFun({missing_prefix, Type, node_name_to_prefixes(from_parent_key(NodeKey))}, - HandlerAcc). -%% @private -extract_compare_acc([], HandlerAcc) -> - HandlerAcc; -extract_compare_acc([{acc, Acc}], _HandlerAcc) -> - Acc. - -%% @private -get_node(NodeName, Tree) -> - Node = lookup_node(NodeName, Tree), - get_node(NodeName, Node, Tree). - -%% @private -get_node(NodeName, undefined, Tree) -> - create_node(NodeName, Tree); -get_node(_NodeName, Node, _Tree) -> - Node. - -%% @private -lookup_node(NodeName, Tree=#hashtree_tree{nodes=Nodes}) -> - NodeKey = node_key(NodeName, Tree), - case ets:lookup(Nodes, NodeKey) of - [] -> undefined; - [{NodeKey, Node}] -> Node - end. - -%% @private -create_node(?ROOT, Tree) -> - NodeId = node_id(?ROOT, Tree), - NodePath = node_path(Tree), - NumSegs = node_num_segs(?ROOT), - Width = node_width(?ROOT), - Opts = [{segment_path, NodePath}, {segments, NumSegs}, {width, Width}], - %% destroy any data that previously existed because its lingering from - %% a tree that was not properly destroyed - ok = hashtree:destroy(NodePath), - Node = hashtree:new(NodeId, Opts), - set_node(?ROOT, Node, Tree); -create_node([], Tree) -> - create_node(?ROOT, Tree); -create_node(NodeName, Tree) -> - NodeId = node_id(NodeName, Tree), - RootNode = get_node(?ROOT, Tree), - NumSegs = node_num_segs(NodeName), - Width = node_width(NodeName), - Opts = [{segments, NumSegs}, {width, Width}], - %% share segment store accross all nodes - Node = hashtree:new(NodeId, RootNode, Opts), - set_node(NodeName, Node, Tree). - -%% @private -set_node(NodeName, Node, Tree) when is_list(NodeName) orelse NodeName =:= ?ROOT -> - set_node(node_key(NodeName, Tree), Node, Tree); -set_node(NodeKey, Node, #hashtree_tree{nodes=Nodes}) when is_tuple(NodeKey) -> - ets:insert(Nodes, [{NodeKey, Node}]), - Node. - -%% @private -parent_node(?ROOT, _Tree) -> - %% root has no parent - undefined; -parent_node([_Single], Tree) -> - %% parent of first level is the root - {?ROOT, get_node(?ROOT, Tree)}; -parent_node([_Prefix | Parent], Tree) -> - %% parent of subsequent level is tail of node name - {Parent, get_node(Parent, Tree)}. - -%% @private -node_width(?ROOT) -> - 256; -node_width(NodeName) -> - case length(NodeName) < 2 of - true -> 512; - false -> 1024 - end. - -%% @private -node_num_segs(?ROOT) -> - 256 * 256; -node_num_segs(NodeName) -> - case length(NodeName) < 2 of - true -> 512 * 512; - false -> 1024 * 1024 - end. - -%% @private -node_path(#hashtree_tree{data_root=DataRoot}) -> - DataRoot. - -%% @private -node_key(NodeName, #hashtree_tree{id=TreeId}) -> - {TreeId, NodeName}. - -%% @private -node_key_to_name({_TreeId, NodeName}) -> - NodeName. - -%% @private -node_id(?ROOT, #hashtree_tree{id=TreeId}) -> - {TreeId, <<0:176/integer>>}; -node_id(NodeName, #hashtree_tree{id=TreeId}) -> - <> = crypto:hash(md5, (term_to_binary(NodeName))), - {TreeId, <>}. - -%% @private -to_parent_key(NodeName) -> - term_to_binary(NodeName). - -%% @private -from_parent_key(NodeKey) -> - binary_to_term(NodeKey). - -%% @private -valid_prefixes(NodeName, #hashtree_tree{num_levels=NumLevels}) -> - length(NodeName) =:= NumLevels. - -%% @private -prefixes_to_node_name([]) -> - ?ROOT; -prefixes_to_node_name(Prefixes) -> - lists:reverse(Prefixes). - -%% @private -node_name_to_prefixes(?ROOT) -> - []; -node_name_to_prefixes(NodeName) -> - lists:reverse(NodeName). - -%% @private -extract_top_hash([]) -> - undefined; -extract_top_hash([{0, Hash}]) -> - Hash. - -%% @private -data_root(Opts) -> - case proplists:get_value(data_dir, Opts) of - undefined -> - Base = "/tmp/hashtree_tree", - <> = crypto:hash(md5, term_to_binary(erlang:now())), - filename:join(Base, integer_to_list(P, 16)); - Root -> Root - end. diff --git a/src/plumtree_app.erl b/src/plumtree_app.erl index 92f030a..4f3a659 100644 --- a/src/plumtree_app.erl +++ b/src/plumtree_app.erl @@ -25,7 +25,6 @@ -export([start/2, stop/1]). start(_StartType, _StartArgs) -> - _State = plumtree_peer_service_manager:init(), case plumtree_sup:start_link() of {ok, Pid} -> %% do nothing for now diff --git a/src/plumtree_broadcast.erl b/src/plumtree_broadcast.erl index 5649886..37a00da 100644 --- a/src/plumtree_broadcast.erl +++ b/src/plumtree_broadcast.erl @@ -37,10 +37,15 @@ debug_get_peers/3, debug_get_tree/2]). - %% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-include("plumtree.hrl"). -define(SERVER, ?MODULE). @@ -111,8 +116,7 @@ %% to generate membership updates as the ring changes. -spec start_link() -> {ok, pid()} | ignore | {error, term()}. start_link() -> - {ok, LocalState} = plumtree_peer_service_manager:get_local_state(), - Members = riak_dt_orswot:value(LocalState), + {ok, Members} = plumtree_peer_service_manager:members(), {InitEagers, InitLazys} = init_peers(Members), Mods = app_helper:get_env(plumtree, broadcast_mods, [plumtree_metadata_manager]), Res = start_link(Members, InitEagers, InitLazys, Mods), @@ -216,15 +220,12 @@ debug_get_tree(Root, Nodes) -> %%%=================================================================== %% @private --spec init([[nodename()]]) -> {ok, #state{}} | - {ok, #state{}, non_neg_integer() | infinity} | - ignore | - {stop, term()}. +-spec init([[any()],...]) -> {ok, #state{}}. init([AllMembers, InitEagers, InitLazys, Mods]) -> schedule_lazy_tick(), schedule_exchange_tick(), State1 = #state{ - outstanding = orddict:new(), + outstanding = orddict:new(), mods = lists:usort(Mods), exchanges=[] }, @@ -232,13 +233,7 @@ init([AllMembers, InitEagers, InitLazys, Mods]) -> {ok, State2}. %% @private --spec handle_call(term(), {pid(), term()}, #state{}) -> - {reply, term(), #state{}} | - {reply, term(), #state{}, non_neg_integer()} | - {noreply, #state{}} | - {noreply, #state{}, non_neg_integer()} | - {stop, term(), term(), #state{}} | - {stop, term(), #state{}}. +-spec handle_call(term(), {pid(), term()}, #state{}) -> {reply, term(), #state{}}. handle_call({get_peers, Root}, _From, State) -> EagerPeers = all_peers(Root, State#state.eager_sets, State#state.common_eagers), LazyPeers = all_peers(Root, State#state.lazy_sets, State#state.common_lazys), @@ -252,9 +247,7 @@ handle_call({cancel_exchanges, WhichExchanges}, _From, State) -> {reply, Cancelled, State}. %% @private --spec handle_cast(term(), #state{}) -> {noreply, #state{}} | - {noreply, #state{}, non_neg_integer()} | - {stop, term(), #state{}}. +-spec handle_cast(term(), #state{}) -> {noreply, #state{}}. handle_cast({broadcast, MessageId, Message, Mod}, State) -> State1 = eager_push(MessageId, Message, Mod, State), State2 = schedule_lazy_push(MessageId, Mod, State1), @@ -278,7 +271,7 @@ handle_cast({graft, MessageId, Mod, Round, Root, From}, State) -> State1 = handle_graft(Result, MessageId, Mod, Round, Root, From, State), {noreply, State1}; handle_cast({update, LocalState}, State=#state{all_members=BroadcastMembers}) -> - Members = riak_dt_orswot:value(LocalState), + Members = ?SET:value(LocalState), CurrentMembers = ordsets:from_list(Members), New = ordsets:subtract(CurrentMembers, BroadcastMembers), Removed = ordsets:subtract(BroadcastMembers, CurrentMembers), @@ -292,9 +285,8 @@ handle_cast({update, LocalState}, State=#state{all_members=BroadcastMembers}) -> {noreply, State2}. %% @private --spec handle_info(term(), #state{}) -> {noreply, #state{}} | - {noreply, #state{}, non_neg_integer()} | - {stop, term(), #state{}}. +-spec handle_info('exchange_tick' | 'lazy_tick' | {'DOWN', _, 'process', _, _}, #state{}) -> + {noreply, #state{}}. handle_info(lazy_tick, State) -> schedule_lazy_tick(), _ = send_lazy(State), @@ -464,7 +456,6 @@ exchange_filter({mod, Mod}) -> Mod =:= ExchangeMod end. - %% picks random root uniformly random_root(#state{all_members=Members}) -> random_other_node(Members). diff --git a/src/plumtree_metadata_manager.erl b/src/plumtree_metadata_manager.erl index a5dc7c8..0dddc34 100644 --- a/src/plumtree_metadata_manager.erl +++ b/src/plumtree_metadata_manager.erl @@ -321,9 +321,7 @@ exchange(Peer) -> %% @private -spec init([mm_opts()]) -> {ok, #state{}} | - {ok, #state{}, non_neg_integer() | infinity} | - ignore | - {stop, term()}. + {stop, no_data_dir}. init([Opts]) -> case data_root(Opts) of undefined -> @@ -342,13 +340,7 @@ init([Opts]) -> end. %% @private --spec handle_call(term(), {pid(), term()}, #state{}) -> - {reply, term(), #state{}} | - {reply, term(), #state{}, non_neg_integer()} | - {noreply, #state{}} | - {noreply, #state{}, non_neg_integer()} | - {stop, term(), term(), #state{}} | - {stop, term(), #state{}}. +-spec handle_call(term(), {pid(), term()}, #state{}) -> {reply, term(), #state{}}. handle_call({put, PKey, Context, ValueOrFun}, _From, State) -> {Result, NewState} = read_modify_write(PKey, Context, ValueOrFun, State), {reply, Result, NewState}; @@ -382,16 +374,12 @@ handle_call({is_stale, PKey, Context}, _From, State) -> {reply, IsStale, State}. %% @private --spec handle_cast(term(), #state{}) -> {noreply, #state{}} | - {noreply, #state{}, non_neg_integer()} | - {stop, term(), #state{}}. +-spec handle_cast(term(), #state{}) -> {noreply, #state{}}. handle_cast(_Msg, State) -> {noreply, State}. %% @private --spec handle_info(term(), #state{}) -> {noreply, #state{}} | - {noreply, #state{}, non_neg_integer()} | - {stop, term(), #state{}}. +-spec handle_info({'DOWN', _, 'process', _, _}, #state{}) -> {noreply, #state{}}. handle_info({'DOWN', ItRef, process, _Pid, _Reason}, State) -> close_remote_iterator(ItRef, State), {noreply, State}. diff --git a/src/plumtree_peer_service.erl b/src/plumtree_peer_service.erl index abd0305..4b291b8 100644 --- a/src/plumtree_peer_service.erl +++ b/src/plumtree_peer_service.erl @@ -27,8 +27,9 @@ attempt_join/2, leave/1, stop/0, - stop/1 - ]). + stop/1]). + +-include("plumtree.hrl"). %% @doc prepare node to join a cluster join(Node) -> @@ -40,7 +41,7 @@ join(NodeStr, Auto) when is_list(NodeStr) -> join(Node, Auto) when is_atom(Node) -> join(node(), Node, Auto). -%% @doc Initiate join. Nodes cannot join themselves. +%% @doc Initiate join. Nodes cannot join themselves. join(Node, Node, _) -> {error, self_join}; join(_, Node, _Auto) -> @@ -59,25 +60,25 @@ attempt_join(Node) -> attempt_join(Node, Local) -> {ok, Remote} = gen_server:call({plumtree_peer_service_gossip, Node}, send_state), - Merged = riak_dt_orswot:merge(Remote, Local), + Merged = ?SET:merge(Remote, Local), _ = plumtree_peer_service_manager:update_state(Merged), %% broadcast to all nodes %% get peer list - Members = riak_dt_orswot:value(Merged), + Members = ?SET:value(Merged), _ = [gen_server:cast({plumtree_peer_service_gossip, P}, {receive_state, Merged}) || P <- Members, P /= node()], ok. - + leave(_Args) when is_list(_Args) -> {ok, Local} = plumtree_peer_service_manager:get_local_state(), {ok, Actor} = plumtree_peer_service_manager:get_actor(), - {ok, Leave} = riak_dt_orswot:update({remove, node()}, Actor, Local), + {ok, Leave} = ?SET:update({remove, node()}, Actor, Local), case random_peer(Leave) of {ok, Peer} -> {ok, Remote} = gen_server:call({plumtree_peer_service_gossip, Peer}, send_state), - Merged = riak_dt_orswot:merge(Leave, Remote), + Merged = ?SET:merge(Leave, Remote), _ = gen_server:cast({plumtree_peer_service_gossip, Peer}, {receive_state, Merged}), {ok, Remote2} = gen_server:call({plumtree_peer_service_gossip, Peer}, send_state), - Remote2List = riak_dt_orswot:value(Remote2), + Remote2List = ?SET:value(Remote2), case [P || P <- Remote2List, P =:= node()] of [] -> %% leaving the cluster shuts down the node @@ -100,7 +101,7 @@ stop(Reason) -> init:stop(). random_peer(Leave) -> - Members = riak_dt_orswot:value(Leave), + Members = ?SET:value(Leave), Peers = [P || P <- Members], case Peers of [] -> diff --git a/src/plumtree_peer_service_console.erl b/src/plumtree_peer_service_console.erl index e250aa7..1f5f6b8 100644 --- a/src/plumtree_peer_service_console.erl +++ b/src/plumtree_peer_service_console.erl @@ -22,9 +22,10 @@ -export([members/1]). +-include("plumtree.hrl"). + members([]) -> - {ok, LocalState} = plumtree_peer_service_manager:get_local_state(), - Members = riak_dt_orswot:value(LocalState), + {ok, Members} = plumtree_peer_service_manager:members(), print_members(Members). print_members(Members) -> diff --git a/src/plumtree_peer_service_gossip.erl b/src/plumtree_peer_service_gossip.erl index 4b9b54b..2ed3d27 100644 --- a/src/plumtree_peer_service_gossip.erl +++ b/src/plumtree_peer_service_gossip.erl @@ -24,10 +24,19 @@ -define(GOSSIP_INTERVAL, 15000). --export([start_link/0, stop/0]). +-export([start_link/0, + stop/0]). + -export([receive_state/1]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-include("plumtree.hrl"). %%%================================================================== %%% gen_server api @@ -59,12 +68,12 @@ handle_call(send_state, _From, State) -> handle_cast({receive_state, PeerState}, State) -> {ok, LocalState} = plumtree_peer_service_manager:get_local_state(), - case riak_dt_orswot:equal(PeerState, LocalState) of + case ?SET:equal(PeerState, LocalState) of true -> %% do nothing {noreply, State}; false -> - Merged = riak_dt_orswot:merge(PeerState, LocalState), + Merged = ?SET:merge(PeerState, LocalState), plumtree_peer_service_manager:update_state(Merged), plumtree_peer_service_events:update(Merged), {noreply, State} @@ -103,7 +112,7 @@ do_gossip() -> %% @doc returns a list of peer nodes get_peers(Local) -> - Members = riak_dt_orswot:value(Local), + Members = ?SET:value(Local), Peers = [X || X <- Members, X /= node()], Peers. diff --git a/src/plumtree_peer_service_manager.erl b/src/plumtree_peer_service_manager.erl index 19f4a03..60e0f4e 100644 --- a/src/plumtree_peer_service_manager.erl +++ b/src/plumtree_peer_service_manager.erl @@ -20,76 +20,142 @@ -module(plumtree_peer_service_manager). --define(TBL, cluster_state). - --export([init/0, get_local_state/0, get_actor/0, update_state/1, delete_state/0]). - -init() -> - %% setup ETS table for cluster_state - _ = try ets:new(?TBL, [named_table, public, set, {keypos, 1}]) of - _Res -> - gen_actor(), - maybe_load_state_from_disk(), - ok - catch - error:badarg -> - lager:warning("Table ~p already exists", [?TBL]) - %%TODO rejoin logic - end, - ok. +-behaviour(gen_server). + +%% API +-export([start_link/0, + members/0, + get_local_state/0, + get_actor/0, + update_state/1, + delete_state/0]). + +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-include("plumtree.hrl"). + +-type actor() :: binary(). +-type membership() :: ?SET:orswot(). + +-record(state, {actor :: actor(), + membership :: membership() }). -%% @doc return local node's view of cluster membership +%%%=================================================================== +%%% API +%%%=================================================================== + +%% @doc Same as start_link([]). +-spec start_link() -> {ok, pid()} | ignore | {error, term()}. +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +%% @doc Return membership list. +members() -> + gen_server:call(?MODULE, members, infinity). + +%% @doc Return local node's view of cluster membership. get_local_state() -> - case hd(ets:lookup(?TBL, cluster_state)) of - {cluster_state, State} -> - {ok, State}; - _Else -> - {error, _Else} - end. - -%% @doc return local node's current actor + gen_server:call(?MODULE, get_local_state, infinity). + +%% @doc Return local node's current actor. get_actor() -> - case hd(ets:lookup(?TBL, actor)) of - {actor, Actor} -> - {ok, Actor}; - _Else -> - {error, _Else} - end. + gen_server:call(?MODULE, get_actor, infinity). -%% @doc update cluster_state +%% @doc Update cluster state. update_state(State) -> - write_state_to_disk(State), - ets:insert(?TBL, {cluster_state, State}). + gen_server:call(?MODULE, {update_state, State}, infinity). +%% @doc Delete state. delete_state() -> - delete_state_from_disk(). + gen_server:call(?MODULE, delete_state, infinity). -%%% ------------------------------------------------------------------ -%%% internal functions -%%% ------------------------------------------------------------------ +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== -%% @doc initialize singleton cluster -add_self() -> - Initial = riak_dt_orswot:new(), - Actor = ets:lookup(?TBL, actor), - {ok, LocalState} = riak_dt_orswot:update({add, node()}, Actor, Initial), - update_state(LocalState). +%% @private +-spec init([]) -> {ok, #state{}}. +init([]) -> + Actor = gen_actor(), + Membership = maybe_load_state_from_disk(Actor), + {ok, #state{actor=Actor, membership=Membership}}. -%% @doc generate an actor for this node while alive +%% @private +-spec handle_call(term(), {pid(), term()}, #state{}) -> {reply, term(), #state{}}. +handle_call(members, _From, #state{membership=Membership}=State) -> + {reply, {ok, ?SET:value(Membership)}, State}; +handle_call(get_local_state, _From, #state{membership=Membership}=State) -> + {reply, {ok, Membership}, State}; +handle_call(get_actor, _From, #state{actor=Actor}=State) -> + {reply, {ok, Actor}, State}; +handle_call({update_state, NewState}, _From, #state{membership=Membership}=State) -> + Merged = ?SET:merge(Membership, NewState), + persist_state(Merged), + {reply, ok, State#state{membership=Merged}}; +handle_call(delete_state, _From, State) -> + delete_state_from_disk(), + {reply, ok, State}; +handle_call(Msg, _From, State) -> + lager:warning("Unhandled messages: ~p", [Msg]), + {reply, ok, State}. + +%% @private +-spec handle_cast(term(), #state{}) -> {noreply, #state{}}. +handle_cast(Msg, State) -> + lager:warning("Unhandled messages: ~p", [Msg]), + {noreply, State}. + +%% @private +-spec handle_info(term(), #state{}) -> {noreply, #state{}}. +handle_info(Msg, State) -> + lager:warning("Unhandled messages: ~p", [Msg]), + {noreply, State}. + +%% @private +-spec terminate(term(), #state{}) -> term(). +terminate(_Reason, _State) -> + ok. + +%% @private +-spec code_change(term() | {down, term()}, #state{}, term()) -> {ok, #state{}}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +%% @private +empty_membership(Actor) -> + Initial = ?SET:new(), + {ok, LocalState} = ?SET:update({add, node()}, Actor, Initial), + persist_state(LocalState), + LocalState. + +%% @private gen_actor() -> Node = atom_to_list(node()), - {M, S, U} = now(), - TS = integer_to_list(M * 1000 * 1000 * 1000 * 1000 + S * 1000 * 1000 + U), + Unique = time_compat:unique_integer([positive]), + TS = integer_to_list(Unique), Term = Node ++ TS, - Actor = crypto:hash(sha, Term), - ets:insert(?TBL, {actor, Actor}). + crypto:hash(sha, Term). +%% @private data_root() -> case application:get_env(plumtree, plumtree_data_dir) of - {ok, PRoot} -> filename:join(PRoot, "peer_service"); - undefined -> undefined + {ok, PRoot} -> + filename:join(PRoot, "peer_service"); + undefined -> + undefined end. +%% @private write_state_to_disk(State) -> case data_root() of undefined -> @@ -97,14 +163,12 @@ write_state_to_disk(State) -> Dir -> File = filename:join(Dir, "cluster_state"), ok = filelib:ensure_dir(File), - lager:info("writing state ~p to disk ~p", - [State, riak_dt_orswot:to_binary(State)]), - ok = file:write_file(File, - riak_dt_orswot:to_binary(State)) + ok = file:write_file(File, ?SET:to_binary(State)) end. +%% @private delete_state_from_disk() -> - case data_root() of + case data_root() of undefined -> ok; Dir -> @@ -118,19 +182,22 @@ delete_state_from_disk() -> end end. -maybe_load_state_from_disk() -> +%% @private +maybe_load_state_from_disk(Actor) -> case data_root() of undefined -> - add_self(); + empty_membership(Actor); Dir -> case filelib:is_regular(filename:join(Dir, "cluster_state")) of true -> - {ok, Bin} = file:read_file(filename:join(Dir, - "cluster_state")), - {ok, State} = riak_dt_orswot:from_binary(Bin), - lager:info("read state from file ~p~n", [State]), - update_state(State); + {ok, Bin} = file:read_file(filename:join(Dir, "cluster_state")), + {ok, State} = ?SET:from_binary(Bin), + State; false -> - add_self() + empty_membership(Actor) end end. + +%% @private +persist_state(State) -> + write_state_to_disk(State). diff --git a/src/plumtree_sup.erl b/src/plumtree_sup.erl index 01f7165..14ee635 100644 --- a/src/plumtree_sup.erl +++ b/src/plumtree_sup.erl @@ -35,6 +35,7 @@ start_link() -> init([]) -> Children = lists:flatten( [ + ?CHILD(plumtree_peer_service_manager, worker), ?CHILD(plumtree_peer_service_gossip, worker), ?CHILD(plumtree_peer_service_events, worker), ?CHILD(plumtree_broadcast, worker), diff --git a/test/cluster_membership_SUITE.erl b/test/cluster_membership_SUITE.erl index ead5a52..f058614 100644 --- a/test/cluster_membership_SUITE.erl +++ b/test/cluster_membership_SUITE.erl @@ -19,6 +19,7 @@ %% ------------------------------------------------------------------- -module(cluster_membership_SUITE). +-compile({parse_transform, lager_transform}). -export([ %% suite/0, @@ -55,9 +56,9 @@ init_per_suite(_Config) -> {ok, Hostname} = inet:gethostname(), case net_kernel:start([list_to_atom("runner@"++Hostname), shortnames]) of {ok, _} -> ok; - {error, {already_started, _}} -> ok + {error, {already_started, _}} -> ok; + {error, {{already_started, _},_}} -> ok end, - lager:info("node name ~p", [node()]), _Config. end_per_suite(_Config) -> @@ -68,7 +69,6 @@ init_per_testcase(Case, Config) -> Nodes = plumtree_test_utils:pmap(fun(N) -> plumtree_test_utils:start_node(N, Config, Case) end, [jaguar, shadow, thorn, pyros]), - {ok, _} = ct_cover:add_nodes(Nodes), [{nodes, Nodes}|Config]. end_per_testcase(_, _Config) -> @@ -81,7 +81,6 @@ all() -> singleton_test(Config) -> Nodes = proplists:get_value(nodes, Config), - ok = ct_cover:remove_nodes(Nodes), [[Node] = plumtree_test_utils:get_cluster_members(Node) || Node <- Nodes], ok. diff --git a/test/metadata_SUITE.erl b/test/metadata_SUITE.erl index 7b30517..ed06248 100644 --- a/test/metadata_SUITE.erl +++ b/test/metadata_SUITE.erl @@ -19,6 +19,7 @@ %% ------------------------------------------------------------------- -module(metadata_SUITE). +-compile({parse_transform, lager_transform}). -export([ %% suite/0, @@ -51,9 +52,9 @@ init_per_suite(_Config) -> {ok, Hostname} = inet:gethostname(), case net_kernel:start([list_to_atom("runner@"++Hostname), shortnames]) of {ok, _} -> ok; - {error, {already_started, _}} -> ok + {error, {already_started, _}} -> ok; + {error, {{already_started, _},_}} -> ok end, - lager:info("node name ~p", [node()]), _Config. end_per_suite(_Config) -> @@ -64,7 +65,6 @@ init_per_testcase(Case, Config) -> Nodes = plumtree_test_utils:pmap(fun(N) -> plumtree_test_utils:start_node(N, Config, Case) end, [electra, katana, flail, gargoyle]), - {ok, _} = ct_cover:add_nodes(Nodes), [{nodes, Nodes}|Config]. end_per_testcase(_, _Config) -> diff --git a/test/plumtree_test_utils.erl b/test/plumtree_test_utils.erl index ccb7b16..65faac5 100644 --- a/test/plumtree_test_utils.erl +++ b/test/plumtree_test_utils.erl @@ -20,22 +20,23 @@ -module(plumtree_test_utils). --export([ - get_cluster_members/1, - pmap/2, - wait_until/3, - wait_until_left/2, - wait_until_joined/2, - wait_until_offline/1, - wait_until_disconnected/2, - wait_until_connected/2, - start_node/3, - partition_cluster/2, - heal_cluster/2 - ]). +-export([get_cluster_members/1, + pmap/2, + wait_until/3, + wait_until_left/2, + wait_until_joined/2, + wait_until_offline/1, + wait_until_disconnected/2, + wait_until_connected/2, + start_node/3, + partition_cluster/2, + heal_cluster/2]). + +-include("plumtree.hrl"). + get_cluster_members(Node) -> {Node, {ok, Res}} = {Node, rpc:call(Node, plumtree_peer_service_manager, get_local_state, [])}, - riak_dt_orswot:value(Res). + ?SET:value(Res). pmap(F, L) -> Parent = self(), diff --git a/tools.mk b/tools.mk index 4dcd0a3..2e05887 100644 --- a/tools.mk +++ b/tools.mk @@ -1,56 +1,7 @@ -# ------------------------------------------------------------------- -# -# Copyright (c) 2014 Basho Technologies, Inc. -# -# This file is provided to you under the Apache License, -# Version 2.0 (the "License"); you may not use this file -# except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# ------------------------------------------------------------------- - -# ------------------------------------------------------------------- -# NOTE: This file is is from https://github.com/basho/tools.mk. -# It should not be edited in a project. It should simply be updated -# wholesale when a new version of tools.mk is released. -# ------------------------------------------------------------------- - -REBAR ?= ./rebar -REVISION ?= $(shell git rev-parse --short HEAD) -PROJECT ?= $(shell basename `find src -name "*.app.src"` .app.src) -DEP_DIR ?= "deps" -EBIN_DIR ?= "ebin" - -.PHONY: compile-no-deps test docs xref dialyzer-run dialyzer-quick dialyzer \ - cleanplt upload-docs - -compile-no-deps: - ${REBAR} compile skip_deps=true - -test: compile - ${REBAR} eunit skip_deps=true - -upload-docs: docs - @if [ -z "${BUCKET}" -o -z "${PROJECT}" -o -z "${REVISION}" ]; then \ - echo "Set BUCKET, PROJECT, and REVISION env vars to upload docs"; \ - exit 1; fi - @cd doc; s3cmd put -P * "s3://${BUCKET}/${PROJECT}/${REVISION}/" > /dev/null - @echo "Docs built at: http://${BUCKET}.s3-website-us-east-1.amazonaws.com/${PROJECT}/${REVISION}" +REBAR ?= ./rebar3 docs: ${REBAR} doc skip_deps=true xref: compile ${REBAR} xref skip_deps=true - -dialyzer: compile - ${REBAR} dialyzer