Merge branch 'merge/corfurl-master'

This commit is contained in:
Scott Lystig Fritchie 2015-03-02 18:12:46 +09:00
commit 4dd3ccf10c
24 changed files with 3725 additions and 0 deletions

7
prototype/corfurl/.gitignore vendored Normal file
View file

@ -0,0 +1,7 @@
.eunit
.eqc-info
current_counterexample.eqc
deps
ebin/*.beam
ebin/*.app
erl_crash.dump

View file

@ -0,0 +1,26 @@
REBAR_BIN := $(shell which rebar)
ifeq ($(REBAR_BIN),)
REBAR_BIN = ./rebar
endif
.PHONY: rel deps package pkgclean
all: deps compile
compile:
$(REBAR_BIN) compile
deps:
$(REBAR_BIN) get-deps
clean:
$(REBAR_BIN) clean
test: deps compile eunit
eunit:
$(REBAR_BIN) -v skip_deps=true eunit
pulse: compile
env USE_PULSE=1 $(REBAR_BIN) skip_deps=true clean compile
env USE_PULSE=1 $(REBAR_BIN) skip_deps=true -D PULSE eunit

View file

@ -0,0 +1,17 @@
This is a repo that has other stuff that Greg Burd was noodling
around with wrt distributed indexing. I haven't bothered weeding
any of it out, sorry!
The corfurl code is in the 'src' and 'include' directories. In
addition, there are docs here:
https://github.com/basho/corfurl/blob/master/docs/corfurl.md
This is a README-style collection of CORFU-related papers,
building instructions, and testing instructions.
https://github.com/basho/corfurl/tree/master/docs/corfurl/notes
https://github.com/basho/corfurl/tree/master/docs/corfurl/notes#two-clients-try-to-write-the-exact-same-data-at-the-same-time-to-the-same-lpn
The above are some notes about testing problems & solutions that
I was/am/?? hoping might find their way into a paper someday.

View file

@ -0,0 +1,191 @@
## CORFU papers
I recommend the "5 pages" paper below first, to give a flavor of
what the CORFU is about. When Scott first read the CORFU paper
back in 2011 (and the Hyder paper), he thought it was insanity.
He recommends waiting before judging quite so hastily. :-)
After that, then perhaps take a step back are skim over the
Hyder paper. Hyder started before CORFU, but since CORFU, the
Hyder folks at Microsoft have rewritten Hyder to use CORFU as
the shared log underneath it. But the Hyder paper has lots of
interesting bits about how you'd go about creating a distributed
DB where the transaction log *is* the DB.
### "CORFU: A Distributed Shared Log"
MAHESH BALAKRISHNAN, DAHLIA MALKHI, JOHN D. DAVIS, and VIJAYAN
PRABHAKARAN, Microsoft Research Silicon Valley, MICHAEL WEI,
University of California, San Diego, TED WOBBER, Microsoft Research
Silicon Valley
Long version of introduction to CORFU (~30 pages)
http://www.snookles.com/scottmp/corfu/corfu.a10-balakrishnan.pdf
### "CORFU: A Shared Log Design for Flash Clusters"
Same authors as above
Short version of introduction to CORFU paper above (~12 pages)
http://www.snookles.com/scottmp/corfu/corfu-shared-log-design.nsdi12-final30.pdf
### "From Paxos to CORFU: A Flash-Speed Shared Log"
Same authors as above
5 pages, a short summary of CORFU basics and some trial applications
that have been implemented on top of it.
http://www.snookles.com/scottmp/corfu/paxos-to-corfu.malki-acmstyle.pdf
### "Beyond Block I/O: Implementing a Distributed Shared Log in Hardware"
Wei, Davis, Wobber, Balakrishnan, Malkhi
Summary report of implmementing the CORFU server-side in
FPGA-style hardware. (~11 pages)
http://www.snookles.com/scottmp/corfu/beyond-block-io.CameraReady.pdf
### "Tango: Distributed Data Structures over a Shared Log"
Balakrishnan, Malkhi, Wobber, Wu, Brabhakaran, Wei, Davis, Rao, Zou, Zuck
Describes a framework for developing data structures that reside
persistently within a CORFU log: the log *is* the database/data
structure store.
http://www.snookles.com/scottmp/corfu/Tango.pdf
### "Dynamically Scalable, Fault-Tolerant Coordination on a Shared Logging Service"
Wei, Balakrishnan, Davis, Malkhi, Prabhakaran, Wobber
The ZooKeeper inter-server communication is replaced with CORFU.
Faster, fewer lines of code than ZK, and more features than the
original ZK code base.
http://www.snookles.com/scottmp/corfu/zookeeper-techreport.pdf
### "Hyder A Transactional Record Manager for Shared Flash"
Bernstein, Reid, Das
Describes a distributed log-based DB system where the txn log is
treated quite oddly: a "txn intent" record is written to a
shared common log All participants read the shared log in
parallel and make commit/abort decisions in parallel, based on
what conflicts (or not) that they see in the log. Scott's first
reading was "No way, wacky" ... and has since changed his mind.
http://www.snookles.com/scottmp/corfu/CIDR11Proceedings.pdf
pages 9-20
## Fiddling with PULSE
Do the following:
make clean
make
make pulse
... then watch the dots go across the screen for 60 seconds. If you
wish, you can press `Control-c` to interrupt the test. We're really
interested in the build artifacts.
erl -pz .eunit deps/*/ebin
eqc:quickcheck(eqc:testing_time(5, corfurl_pulse:prop_pulse())).
This will run the PULSE test for 5 seconds. Feel free to adjust for
as many seconds as you wish.
Erlang R16B02-basho4 (erts-5.10.3) [source] [64-bit] [smp:8:8] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]
Eshell V5.10.3 (abort with ^G)
1> eqc:quickcheck(eqc:testing_time(5, corfurl_pulse:prop_pulse())).
Starting Quviq QuickCheck version 1.30.4
(compiled at {{2014,2,7},{9,19,50}})
Licence for Basho reserved until {{2014,2,17},{1,41,39}}
......................................................................................
OK, passed 86 tests
schedule: Count: 86 Min: 2 Max: 1974 Avg: 3.2e+2 Total: 27260
true
2>
REPL interactive work can be done via:
1. Edit code, e.g. `corfurl_pulse.erl`.
2. Run `env BITCASK_PULSE=1 ./rebar skip_deps=true -D PULSE eunit suites=SKIP`
to compile.
3. Reload any recompiled modules, e.g. `l(corfurl_pulse).`
4. Resume QuickCheck activities.
## Seeing an PULSE scheduler interleaving failure in action
1. Edit `corfurl_pulse:check_trace()` to uncomment the
use of `conjunction()` that mentions `bogus_order_check_do_not_use_me`
and comment out the real `conjunction()` call below it.
2. Recompile & reload.
3. Check.
For example:
9> eqc:quickcheck(eqc:testing_time(5, corfurl_pulse:prop_pulse())).
.........Failed! After 9 tests.
Sweet! The first tuple below are the first `?FORALL()` values,
and the 2nd is the list of commands,
`{SequentialCommands, ListofParallelCommandLists}`. The 3rd is the
seed used to perturb the PULSE scheduler.
In this case, `SequentialCommands` has two calls (to `setup()` then
`append()`) and there are two parallel procs: one makes 1 call
call to `append()` and the other makes 2 calls to `append()`.
{2,2,9}
{{[{set,{var,1},{call,corfurl_pulse,setup,[2,2,9]}}],
[[{set,{var,3},
{call,corfurl_pulse,append,
[{var,1},<<231,149,226,203,10,105,54,223,147>>]}}],
[{set,{var,2},
{call,corfurl_pulse,append,
[{var,1},<<7,206,146,75,249,13,154,238,110>>]}},
{set,{var,4},
{call,corfurl_pulse,append,
[{var,1},<<224,121,129,78,207,23,79,216,36>>]}}]]},
{27492,46961,4884}}
Here are our results:
simple_result: passed
errors: passed
events: failed
identity: passed
bogus_order_check_do_not_use_me: failed
[{ok,1},{ok,3},{ok,2}] /= [{ok,1},{ok,2},{ok,3}]
Our (bogus!) order expectation was violated. Shrinking!
simple_result: passed
errors: passed
events: failed
identity: passed
bogus_order_check_do_not_use_me: failed
[{ok,1},{ok,3},{ok,2}] /= [{ok,1},{ok,2},{ok,3}]
Shrinking was able to remove two `append()` calls and to shrink the
size of the pages down from 9 bytes down to 1 byte.
Shrinking........(8 times)
{1,1,1}
{{[{set,{var,1},{call,corfurl_pulse,setup,[1,1,1]}}],
[[{set,{var,3},{call,corfurl_pulse,append,[{var,1},<<0>>]}}],
[{set,{var,4},{call,corfurl_pulse,append,[{var,1},<<0>>]}}]]},
{27492,46961,4884}}
events: failed
bogus_order_check_do_not_use_me: failed
[{ok,2},{ok,1}] /= [{ok,1},{ok,2}]
false

View file

@ -0,0 +1,35 @@
msc {
client1, FLU1, FLU2, client2, client3;
client1 box client3 [label="Epoch #1: chain = FLU1 -> FLU2"];
client1 -> FLU1 [label="{write,epoch1,<<Page YYY>>}"];
client1 <- FLU1 [label="ok"];
client1 box client1 [label="Client crash", textcolour="red"];
FLU1 box FLU1 [label="FLU crash", textcolour="red"];
client1 box client3 [label="Epoch #2: chain = FLU2"];
client2 -> FLU2 [label="{write,epoch2,<<Page ZZZ>>}"];
client2 <- FLU2 [label="ok"];
client3 box client3 [label="Read repair starts", textbgcolour="aqua"];
client3 -> FLU2 [label="{read,epoch2}"];
client3 <- FLU2 [label="{ok,<<Page ZZZ>>}"];
client3 -> FLU1 [label="{write,epoch2,<<Page ZZZ>>}"];
FLU1 box FLU1 [label="What do we do here? Our current value is <<Page YYY>>.", textcolour="red"] ;
FLU1 box FLU1 [label="If we do not accept the repair value, then we are effectively UNREPAIRABLE.", textcolour="red"] ;
FLU1 box FLU1 [label="If we do accept the repair value, then we are mutating an already-written value.", textcolour="red"] ;
FLU1 -> client3 [label="I'm sorry, Dave, I cannot do that."];
FLU1 box FLU1 [label = "In theory, while repair is still happening, nobody will ever ask FLU1 for its value.", textcolour="black"] ;
client3 -> FLU1 [label="{write,epoch2,<<Page ZZZ>>,repair,witnesses=[FLU2]}", textbgcolour="silver"];
FLU1 box FLU1 [label="Start an async process to ask the witness list to corroborate this repair."];
FLU1 -> FLU2 [label="{read,epoch2}", textbgcolour="aqua"];
FLU1 <- FLU2 [label="{ok,<<Page ZZ>>}", textbgcolour="aqua"];
FLU1 box FLU1 [label="Overwrite local storage with repair page.", textbgcolour="silver"];
client3 <- FLU1 [label="Async proc replies: ok", textbgcolour="silver"];
}

View file

@ -0,0 +1,92 @@
## read-repair-race.1.
First attempt at using "mscgen" to make some Message Sequence
Chart (MSC) for a race found at commit 087c2605ab.
## read-repair-race.2.
Second attempt. This is almost exactly the trace that is
generated by this failing test case at commit 087c2605ab:
C2 = [{1,2,1},{{[{set,{var,1},{call,corfurl_pulse,setup,[1,2,1,standard]}}],[[{set,{var,3},{call,corfurl_pulse,append,[{var,1},<<0>>]}}],[{set,{var,2},{call,corfurl_pulse,read_approx,[{var,1},6201864198]}},{set,{var,5},{call,corfurl_pulse,append,[{var,1},<<0>>]}}],[{set,{var,4},{call,corfurl_pulse,append,[{var,1},<<0>>]}},{set,{var,6},{call,corfurl_pulse,trim,[{var,1},510442857]}}]]},{25152,1387,78241}},[{events,[[{no_bad_reads,[]}]]}]].
eqc:check(corfurl_pulse:prop_pulse(), C2).
## read-repair-race.2b.*
Same basic condition as read-repair-race.2, but edited
substantially to make it clearer what is happening.
Also for commit 087c2605ab.
I believe that I have a fix for the silver-colored
`error-overwritten` ... and it was indeed added to the code soon
afterward, but it turns out that it doesn't solve the entire problem
of "two clients try to write the exact same data at the same time to
the same LPN".
## "Two Clients Try to Write the Exact Same Data at the Same Time to the Same LPN"
This situation is something that CORFU cannot protect against, IMO.
I have been struggling for a while, to try to find a way for CORFU
clients to know *always* when there is a conflict with another
writer. It usually works: the basic nature of write-once registers is
very powerful. However, in the case where two clients are trying to
write the same page data to the same LPN, it looks impossible to
resolve.
How do you tell the difference between:
1. A race between a client A writing page P at address LPN and
read-repair fixing P. P *is* A's data and no other's, so this race
doesn't confuse anyone.
1. A race between a client A writing page P at address LPN and client
B writing the exact same page data P at the same LPN.
A's page P = B's page P, but clients A & B don't know that.
If CORFU tells both A & B that they were successful, A & B assume
that the CORFU log has two new pages appended to it, but in truth
only one new page was appended.
If we try to solve this by always avoiding the same LPN address
conflict, we are deluding ourselves. If we assume that the sequencer
is 100% correct in that it never assigns the same LPN twice, and if we
assume that a client must never write a block without an assignment
from the sequencer, then the problem is solved. But the problem has a
_heavy_ price: the log is only available when the sequencer is
available, and only when never more than one sequencer running at a
time.
The CORFU base system promises correct operation, even if:
* Zero sequencers are running, and clients might choose the same LPN
to write to.
* Two more more sequencers are running, and different sequencers
assign the same LPN to two different clients.
But CORFU's "correct" behavior does not include detecting the same
page at the same LPN. The papers don't specifically say it, alas.
But IMO it's impossible to guarantee, so all docs ought to explicitly
say that it's impossible and that clients must not assume it.
See also
* two-clients-race.1.png
## A scenario of chain repair & write-once registers
See:
* 2014-02-27.chain-repair-write-twice.png
... for a scenario where write-once registers that are truly only
write-once-ever-for-the-rest-of-the-future are "inconvenient" when it
comes to chain repair. Client 3 is attempting to do chain repair ops,
bringing FLU1 back into sync with FLU2.
The diagram proposes one possible idea for making overwriting a
read-once register a bit safer: ask another node in the chain to
verify that the page you've been asked to repair is exactly the same
as that other FLU's page.

View file

@ -0,0 +1,49 @@
msc {
"<0.12583.0>" [label="Client1"], "<0.12574.0>" [label="FLU1"], "<0.12575.0>" [label="FLU2"], "<0.12576.0>" [label="FLU3"], "<0.12584.0>" [label="Client2"], "<0.12585.0>" [label="Client3"];
"<0.12585.0>" -> "<0.12576.0>" [ label = "{read,1,1}" ] ;
"<0.12583.0>" -> "<0.12574.0>" [ label = "{write,1,1,<<0>>}" ] ;
"<0.12576.0>" -> "<0.12585.0>" [ label = "error_unwritten" ] ;
"<0.12585.0>" abox "<0.12585.0>" [ label="Read Repair starts", textbgcolour="yellow"];
"<0.12585.0>" -> "<0.12574.0>" [ label = "{read,1,1}" ] ;
"<0.12574.0>" -> "<0.12583.0>" [ label = "ok" ] ;
"<0.12583.0>" -> "<0.12575.0>" [ label = "{write,1,1,<<0>>}" ] ;
"<0.12574.0>" -> "<0.12585.0>" [ label = "{ok,<<0>>}" ,textcolour="red"] ;
"<0.12585.0>" -> "<0.12575.0>" [ label = "{write,1,1,<<0>>}" ] ;
"<0.12575.0>" -> "<0.12585.0>" [ label = "ok" ] ;
"<0.12585.0>" -> "<0.12576.0>" [ label = "{write,1,1,<<0>>}" ] ;
"<0.12575.0>" -> "<0.12583.0>" [ label = "error_overwritten" ] ;
"<0.12583.0>" abox "<0.12583.0>" [ label = "Race with read repair? Read to double-check", textbgcolour="yellow" ] ;
"<0.12583.0>" -> "<0.12575.0>" [ label = "{read,1,1}" ] ;
"<0.12576.0>" -> "<0.12585.0>" [ label = "ok" ] ;
"<0.12585.0>" abox "<0.12585.0>" [ label="Read Repair SUCCESS", textbgcolour="green"];
"<0.12585.0>" abox "<0.12585.0>" [ label="Our problem: the PULSE model never believes that append_page ever wrote LPN 1", textcolour="red"];
"<0.12584.0>" abox "<0.12584.0>" [ label = "Client2 decides to trim LPN 1", textbgcolour="orange" ] ;
"<0.12584.0>" -> "<0.12574.0>" [ label = "{trim,1,1}" ] ;
"<0.12575.0>" -> "<0.12583.0>" [ label = "{ok,<<0>>}"] ;
"<0.12583.0>" abox "<0.12583.0>" [ label = "Value matches, yay!", textbgcolour="yellow" ] ;
"<0.12583.0>" abox "<0.12583.0>" [ label = "Continue writing", textbgcolour="yellow" ] ;
"<0.12583.0>" -> "<0.12576.0>" [ label = "{write,1,1,<<0>>}" ] ;
"<0.12574.0>" -> "<0.12584.0>" [ label = "ok" ] ;
"<0.12584.0>" -> "<0.12575.0>" [ label = "{trim,1,1}" ] ;
"<0.12576.0>" -> "<0.12583.0>" [ label = "error_overwritten" ] ;
"<0.12583.0>" abox "<0.12583.0>" [ label = "Race with read repair? Read to double-check", textbgcolour="yellow" ] ;
"<0.12583.0>" -> "<0.12576.0>" [ label = "{read,1,1}" ] ;
"<0.12575.0>" -> "<0.12584.0>" [ label = "ok" ] ;
"<0.12584.0>" -> "<0.12576.0>" [ label = "{trim,1,1}" ] ;
"<0.12576.0>" -> "<0.12584.0>" [ label = "ok" ] ;
"<0.12576.0>" -> "<0.12583.0>" [ label = "error_trimmed" ] ;
"<0.12583.0>" abox "<0.12583.0>" [ label = "Value MISMATCH!", textcolour="red" ] ;
"<0.12583.0>" abox "<0.12583.0>" [ label = "Read repair", textbgcolour="yellow" ] ;
"<0.12583.0>" -> "<0.12574.0>" [ label = "{read,1,1}" ] ;
"<0.12574.0>" -> "<0.12583.0>" [ label = "error_trimmed" ] ;
"<0.12583.0>" -> "<0.12575.0>" [ label = "{fill,1,1}" ] ;
"<0.12575.0>" -> "<0.12583.0>" [ label = "error_trimmed" ] ;
"<0.12583.0>" -> "<0.12576.0>" [ label = "{fill,1,1}" ] ;
"<0.12576.0>" -> "<0.12583.0>" [ label = "error_trimmed" ] ;
"<0.12583.0>" abox "<0.12583.0>" [ label = "At this point, we give up on LPN 1.", textcolour="red" ] ;
"<0.12583.0>" abox "<0.12583.0>" [ label = "Sequencer gives us LPN 2", textbgcolour="yellow" ] ;
"<0.12583.0>" abox "<0.12583.0>" [ label = "LPN 2 has been filled (not shown).", textbgcolour="yellow" ] ;
"<0.12583.0>" abox "<0.12583.0>" [ label = "Sequencer gives us LPN 3", textbgcolour="yellow" ] ;
"<0.12583.0>" abox "<0.12583.0>" [ label = "We write LPN 3 successfully", textbgcolour="green" ] ;
}

View file

@ -0,0 +1,60 @@
msc {
"<0.32555.4>" [label="Client1"], "<0.32551.4>" [label="FLU1"], "<0.32552.4>" [label="FLU2"], "<0.32556.4>" [label="Client2"], "<0.32557.4>" [label="Client3"];
"<0.32555.4>" abox "<0.32555.4>" [ label = "Writer", textbgcolour="orange"],
"<0.32556.4>" abox "<0.32556.4>" [ label = "Reader", textbgcolour="orange"],
"<0.32557.4>" abox "<0.32557.4>" [ label = "Trimmer", textbgcolour="orange"];
"<0.32555.4>" abox "<0.32555.4>" [ label = "append_page()", textbgcolour="yellow"] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Sequencer assigns LPN 1", textbgcolour="yellow"] ;
"<0.32555.4>" -> "<0.32551.4>" [ label = "{write,1,1,<<0>>}" ] ;
"<0.32556.4>" abox "<0.32556.4>" [ label = "read_page(LPN 1)", textbgcolour="yellow"] ;
"<0.32556.4>" -> "<0.32552.4>" [ label = "{read,1,1}" ] ;
"<0.32552.4>" -> "<0.32556.4>" [ label = "error_unwritten" ] ;
"<0.32556.4>" abox "<0.32556.4>" [ label = "Start read repair", textbgcolour="aqua"] ;
"<0.32556.4>" -> "<0.32551.4>" [ label = "{read,1,1}" ] ;
"<0.32551.4>" -> "<0.32555.4>" [ label = "ok" ] ;
"<0.32551.4>" -> "<0.32556.4>" [ label = "{ok,<<0>>}" ] ;
"<0.32556.4>" -> "<0.32552.4>" [ label = "{write,1,1,<<0>>}" ] ;
"<0.32555.4>" -> "<0.32552.4>" [ label = "{write,1,1,<<0>>}" ] ;
"<0.32557.4>" -> "<0.32551.4>" [ label = "{trim,1,1}" ] ;
"<0.32552.4>" -> "<0.32555.4>" [ label = "error_overwritten" ] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Our attempt to write LPN 1 is interrupted", textbgcolour="yellow"] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Check if an eager read-repair has written our data for us.", textbgcolour="yellow"] ;
"<0.32555.4>" -> "<0.32552.4>" [ label = "{read,1,1}" ] ;
"<0.32551.4>" -> "<0.32557.4>" [ label = "ok" ] ;
"<0.32552.4>" -> "<0.32556.4>" [ label = "ok" ] ;
"<0.32556.4>" abox "<0.32556.4>" [ label = "End read repair", textbgcolour="aqua"] ;
"<0.32556.4>" abox "<0.32556.4>" [ label = "read_page(LPN 1) -> {ok, <<0>>}", textbgcolour="yellow"] ;
"<0.32556.4>" abox "<0.32556.4>" [ label = "See red stuff at bottom....", textcolour="red"] ;
# "<0.32556.4>" abox "<0.32556.4>" [ label = "But PULSE thinks that LPN 1 was never written.", textcolour="red"] ;
# "<0.32556.4>" abox "<0.32556.4>" [ label = "Fixing this requires ... lots of pondering...", textcolour="red"] ;
"<0.32557.4>" -> "<0.32552.4>" [ label = "{trim,1,1}" ] ;
"<0.32552.4>" -> "<0.32557.4>" [ label = "ok" ] ;
"<0.32552.4>" -> "<0.32555.4>" [ label = "error_trimmed" ] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Wow, an eager trimmer got us, ouch.", textbgcolour="yellow"] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Start read repair", textbgcolour="aqua"] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Read repair here is for sanity checking, not really necessary.", textbgcolour="yellow"] ;
"<0.32555.4>" -> "<0.32551.4>" [ label = "{read,1,1}" ] ;
"<0.32551.4>" -> "<0.32555.4>" [ label = "error_trimmed" ] ;
"<0.32555.4>" -> "<0.32552.4>" [ label = "{fill,1,1}" ] ;
"<0.32552.4>" -> "<0.32555.4>" [ label = "error_trimmed" ] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "End read repair", textbgcolour="aqua"] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Our attempt to write LPN 1 has failed. Must ask sequencer for a new LPN.", textbgcolour="yellow"] ;
"<0.32551.4>" abox "<0.32552.4>" [ label = "LPN 2 is written (race details omitted)", textbgcolour="orange"] ;
"<0.32551.4>" abox "<0.32552.4>" [ label = "LPN 3 is written (race details omitted)", textbgcolour="orange"] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Sequencer assigns LPN 4", textbgcolour="yellow"] ;
"<0.32555.4>" -> "<0.32551.4>" [ label = "{write,1,4,<<0>>}" ] ;
"<0.32551.4>" -> "<0.32555.4>" [ label = "ok" ] ;
"<0.32555.4>" -> "<0.32552.4>" [ label = "{write,1,4,<<0>>}" ] ;
"<0.32552.4>" -> "<0.32555.4>" [ label = "ok" ] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "append_page() -> LPN 4", textbgcolour="yellow"] ;
"<0.32555.4>" abox "<0.32557.4>" [ label="Small problem: the PULSE model never believes that append_page ever wrote LPN 1", textcolour="red"];
"<0.32556.4>" abox "<0.32556.4>" [ label = "read_page(LPN 1)", textbgcolour="yellow"] ;
"<0.32556.4>" -> "<0.32552.4>" [ label = "{read,1,4}" ] ;
"<0.32552.4>" -> "<0.32556.4>" [ label = "{ok,<<0>>}" ] ;
"<0.32556.4>" abox "<0.32556.4>" [ label = "read_page(LPN 4) -> {ok, <<0>>}", textbgcolour="yellow"] ;
"<0.32555.4>" abox "<0.32557.4>" [ label="Big problem: Client2 has witnessed the same page written at LPN 1 and at LPN 4.", textcolour="red"];
"<0.32555.4>" abox "<0.32557.4>" [ label="", textcolour="red"];
"<0.32555.4>" abox "<0.32557.4>" [ label="", textcolour="red"];
}

View file

@ -0,0 +1,57 @@
msc {
"<0.32555.4>" [label="Client1"], "<0.32551.4>" [label="FLU1=Head"], "<0.32552.4>" [label="FLU2=Tail"], "<0.32556.4>" [label="Client2"], "<0.32557.4>" [label="Client3"];
"<0.32555.4>" abox "<0.32555.4>" [ label = "Writer", textbgcolour="orange"],
"<0.32556.4>" abox "<0.32556.4>" [ label = "Reader", textbgcolour="orange"],
"<0.32557.4>" abox "<0.32557.4>" [ label = "Trimmer", textbgcolour="orange"];
"<0.32555.4>" abox "<0.32555.4>" [ label = "append_page()", textbgcolour="yellow"] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Sequencer assigns LPN 1", textbgcolour="yellow"] ;
"<0.32555.4>" -> "<0.32551.4>" [ label = "{write,1,1,<<0>>}" ] ;
"<0.32551.4>" -> "<0.32555.4>" [ label = "ok" ] ;
"<0.32556.4>" abox "<0.32556.4>" [ label = "read_page(LPN 1)", textbgcolour="yellow"] ;
"<0.32556.4>" -> "<0.32552.4>" [ label = "{read,1,1}" ] ;
"<0.32552.4>" -> "<0.32556.4>" [ label = "error_unwritten" ] ;
"<0.32556.4>" abox "<0.32556.4>" [ label = "Start read repair", textbgcolour="aqua"] ;
"<0.32556.4>" -> "<0.32551.4>" [ label = "{read,1,1}" ] ;
"<0.32551.4>" -> "<0.32556.4>" [ label = "{ok,<<0>>}" ] ;
"<0.32556.4>" -> "<0.32552.4>" [ label = "{write,1,1,<<0>>}" ] ;
"<0.32552.4>" -> "<0.32556.4>" [ label = "ok" ] ;
"<0.32556.4>" abox "<0.32556.4>" [ label = "End read repair", textbgcolour="aqua"] ;
"<0.32556.4>" abox "<0.32556.4>" [ label = "read_page(LPN 1) -> {ok, <<0>>}", textbgcolour="yellow"] ;
"<0.32556.4>" abox "<0.32556.4>" [ label = "See red stuff at bottom....", textcolour="red"] ;
# "<0.32556.4>" abox "<0.32556.4>" [ label = "But PULSE thinks that LPN 1 was never written.", textcolour="red"] ;
# "<0.32556.4>" abox "<0.32556.4>" [ label = "Fixing this requires ... lots of pondering...", textcolour="red"] ;
"<0.32557.4>" -> "<0.32551.4>" [ label = "{trim,1,1}" ] ;
"<0.32551.4>" -> "<0.32557.4>" [ label = "ok" ] ;
"<0.32557.4>" -> "<0.32552.4>" [ label = "{trim,1,1}" ] ;
"<0.32552.4>" -> "<0.32557.4>" [ label = "ok" ] ;
"<0.32555.4>" -> "<0.32552.4>" [ label = "{write,1,1,<<0>>}" ] ;
"<0.32552.4>" -> "<0.32555.4>" [ label = "error_overwritten", textbgcolour="silver" ] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Our attempt to write LPN 1 is interrupted", textbgcolour="yellow"] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Check if an eager read-repair has written our data for us.", textbgcolour="yellow"] ;
"<0.32555.4>" -> "<0.32552.4>" [ label = "{read,1,1}" ] ;
"<0.32552.4>" -> "<0.32555.4>" [ label = "error_trimmed" ] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Wow, an eager trimmer got us, ouch.", textbgcolour="yellow"] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Start read repair", textbgcolour="aqua"] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Read repair here is for sanity checking, not really necessary.", textbgcolour="yellow"] ;
"<0.32555.4>" -> "<0.32551.4>" [ label = "{read,1,1}" ] ;
"<0.32551.4>" -> "<0.32555.4>" [ label = "error_trimmed" ] ;
"<0.32555.4>" -> "<0.32552.4>" [ label = "{fill,1,1}" ] ;
"<0.32552.4>" -> "<0.32555.4>" [ label = "error_trimmed" ] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "End read repair", textbgcolour="aqua"] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Our attempt to write LPN 1 has failed. Must ask sequencer for a new LPN.", textbgcolour="yellow"] ;
"<0.32551.4>" abox "<0.32552.4>" [ label = "LPN 2 and 3 are written (race details omitted)", textbgcolour="orange"] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Sequencer assigns LPN 4", textbgcolour="yellow"] ;
"<0.32555.4>" -> "<0.32551.4>" [ label = "{write,1,4,<<0>>}" ] ;
"<0.32551.4>" -> "<0.32555.4>" [ label = "ok" ] ;
"<0.32555.4>" -> "<0.32552.4>" [ label = "{write,1,4,<<0>>}" ] ;
"<0.32552.4>" -> "<0.32555.4>" [ label = "ok" ] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "append_page() -> LPN 4", textbgcolour="yellow"] ;
"<0.32555.4>" abox "<0.32557.4>" [ label="Small problem: the PULSE model never believes that append_page ever wrote LPN 1", textcolour="red"];
"<0.32556.4>" abox "<0.32556.4>" [ label = "read_page(LPN 1)", textbgcolour="yellow"] ;
"<0.32556.4>" -> "<0.32552.4>" [ label = "{read,1,4}" ] ;
"<0.32552.4>" -> "<0.32556.4>" [ label = "{ok,<<0>>}" ] ;
"<0.32556.4>" abox "<0.32556.4>" [ label = "read_page(LPN 4) -> {ok, <<0>>}", textbgcolour="yellow"] ;
"<0.32555.4>" abox "<0.32557.4>" [ label="Big problem: Client2 has witnessed the same page written at LPN 1 and at LPN 4.", textcolour="red"];
}

View file

@ -0,0 +1,33 @@
msc {
client1, FLU1, FLU2, client2, client3;
client1 -> FLU1 [label="{write,epoch1,<<Not unique page>>}"];
client1 <- FLU1 [label="ok"];
client3 -> FLU2 [label="{seal,epoch1}"];
client3 <- FLU2 [label="{ok,...}"];
client3 -> FLU1 [label="{seal,epoch1}"];
client3 <- FLU1 [label="{ok,...}"];
client2 -> FLU1 [label="{write,epoch1,<<Not unique page>>}"];
client2 <- FLU1 [label="error_epoch"];
client2 abox client2 [label="Ok, get the new epoch info....", textbgcolour="silver"];
client2 -> FLU1 [label="{write,epoch2,<<Not unique page>>}"];
client2 <- FLU1 [label="error_overwritten"];
client1 -> FLU2 [label="{write,epoch1,<<Not unique page>>}"];
client1 <- FLU2 [label="error_epoch"];
client1 abox client1 [label="Ok, hrm.", textbgcolour="silver"];
client3 abox client3 [ label = "Start read repair", textbgcolour="aqua"] ;
client3 -> FLU1 [label="{read,epoch2}"];
client3 <- FLU1 [label="{ok,<<Not unique page>>}"];
client3 -> FLU2 [label="{write,epoch2,<<Not unique page>>}"];
client3 <- FLU2 [label="ok"];
client3 abox client3 [ label = "End read repair", textbgcolour="aqua"] ;
client3 abox client3 [ label = "We saw <<Not unique page>>", textbgcolour="silver"] ;
client1 -> FLU2 [label="{write,epoch2,<<Not unique page>>}"];
client1 <- FLU2 [label="error_overwritten"];
}

View file

@ -0,0 +1,45 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-type flu_name() :: atom().
-type flu() :: pid() | flu_name().
-type flu_chain() :: [flu()].
-type seq_name() :: {'undefined' | pid(), atom(), atom()}.
-record(range, {
pn_start :: non_neg_integer(), % start page number
pn_end :: non_neg_integer(), % end page number
chains :: [flu_chain()]
}).
-record(proj, { % Projection
dir :: string(),
epoch :: non_neg_integer(),
seq :: 'undefined' | seq_name(),
r :: [#range{}]
}).
%% 1 byte @ offset 0: 0=unwritten, 1=written, 2=trimmed, 255=corrupt? TODO
%% 8 bytes @ offset 1: logical page number
%% P bytes @ offset 9: page data
%% 1 byte @ offset 9+P: 0=unwritten, 1=written
-define(PAGE_OVERHEAD, (1 + 8 + 1)).

View file

@ -0,0 +1,55 @@
PulseBuild = case os:getenv("USE_PULSE") of
false ->
false;
_ ->
true
end,
case PulseBuild of
true ->
PulseOpts =
[{pulse_no_side_effect,
[{erlang,display,1}
]},
{pulse_side_effect,
[ {corfurl_sequencer, get, '_'}
, {corfurl_flu, write, '_'}
, {corfurl_flu, read, '_'}
, {corfurl_flu, seal, '_'}
, {corfurl_flu, trim, '_'}
, {corfurl_flu, fill, '_'}
, {corfurl, read_projection, '_'}
, {corfurl, save_projection, '_'}
, {prim_file, '_', '_'}
, {file, '_', '_'}
, {filelib, '_', '_'}
, {os, '_', '_'} ]},
{pulse_replace_module,
[ {gen_server, pulse_gen_server}
, {application, pulse_application}
, {supervisor, pulse_supervisor} ]}
],
PulseCFlags = [{"CFLAGS", "$CFLAGS -DPULSE"}],
UpdConfig = case lists:keysearch(eunit_compile_opts, 1, CONFIG) of
{value, {eunit_compile_opts, Opts}} ->
lists:keyreplace(eunit_compile_opts,
1,
CONFIG,
{eunit_compile_opts, Opts ++ PulseOpts});
_ ->
[{eunit_compile_opts, PulseOpts} | CONFIG]
end,
case lists:keysearch(port_env, 1, UpdConfig) of
{value, {port_env, PortEnv}} ->
lists:keyreplace(port_env,
1,
UpdConfig,
{port_env, PortEnv ++ PulseCFlags});
_ ->
[{port_env, PulseCFlags} | UpdConfig]
end;
false ->
CONFIG
end.

View file

@ -0,0 +1,354 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(corfurl).
-export([new_simple_projection/5,
new_range/3,
read_projection/2,
save_projection/2,
latest_projection_epoch_number/1]).
-export([write_page/3, read_page/2, scan_forward/3,
fill_page/2, trim_page/2]).
-include("corfurl.hrl").
-ifdef(TEST).
-compile(export_all).
-ifdef(PULSE).
-compile({parse_transform, pulse_instrument}).
-endif.
-endif.
%%% Debugging: for extra events in the PULSE event log, use the 2nd statement.
-define(EVENT_LOG(X), ok).
%%% -define(EVENT_LOG(X), event_logger:event(X)).
write_page(#proj{epoch=Epoch} = P, LPN, Page) ->
Chain = project_to_chain(LPN, P),
write_page_to_chain(Chain, Chain, Epoch, LPN, Page, 1).
write_page_to_chain(Chain, Chain, Epoch, LPN, Page, Nth) ->
write_page_to_chain(Chain, Chain, Epoch, LPN, Page, Nth, ok).
write_page_to_chain([], _Chain, _Epoch, _LPN, _Page, _Nth, Reply) ->
Reply;
write_page_to_chain([FLU|Rest], Chain, Epoch, LPN, Page, Nth, Reply) ->
case corfurl_flu:write(flu_pid(FLU), Epoch, LPN, Page) of
ok ->
write_page_to_chain(Rest, Chain, Epoch, LPN, Page, Nth+1, Reply);
error_badepoch ->
%% TODO: Interesting case: there may be cases where retrying with
%% a new epoch & that epoch's projection is just fine (and
%% we'll succeed) and cases where retrying will fail.
%% Figure out what those cases are, then for the
%% destined-to-fail case, try to clean up (via trim?)?
error_badepoch;
error_trimmed when Nth == 1 ->
%% Whoa, partner, you're movin' kinda fast for a trim.
%% This might've been due to us being too slow and someone
%% else junked us.
error_trimmed;
error_trimmed when Nth > 1 ->
%% We're racing with a trimmer. We won the race at head,
%% but here in the middle or tail (Nth > 1), we lost.
%% Our strategy is keep racing down to the tail.
%% If we continue to lose the exact same race for the rest
%% of the chain, the 1st clause of this func will return 'ok'.
%% That is *exactly* our intent and purpose!
write_page_to_chain(Rest, Chain, Epoch, LPN, Page, Nth+1, {special_trimmed, LPN});
error_overwritten when Nth == 1 ->
%% The sequencer lied, or we didn't use the sequencer and
%% guessed and guessed poorly, or someone is accidentally
%% trying to take our page. Shouganai, these things happen.
error_overwritten;
error_overwritten when Nth > 1 ->
%% The likely cause is that another reader has noticed that
%% we haven't finished writing this page in this chain and
%% has repaired the remainder of the chain while we were
%% drinking coffee. Let's double-check.
case corfurl_flu:read(flu_pid(FLU), Epoch, LPN) of
{ok, AlreadyThere} when AlreadyThere =:= Page ->
%% Alright, well, let's go continue the repair/writing,
%% since we agree on the page's value.
write_page_to_chain(Rest, Chain, Epoch, LPN, Page, Nth+1, Reply);
error_badepoch ->
%% TODO: same TODO as the above error_badepoch case.
error_badepoch;
error_trimmed ->
%% This is the same as 'error_trimmed when Nth > 1' above.
%% Do the same thing.
write_page_to_chain(Rest, Chain, Epoch, LPN, Page, Nth+1, {special_trimmed, LPN});
Else ->
%% Can PULSE can drive us to this case?
giant_error({left_off_here, ?MODULE, ?LINE, Else, nth, Nth})
end
end.
read_page(#proj{epoch=Epoch} = P, LPN) ->
Chain = project_to_chain(LPN, P),
Tail = lists:last(Chain),
case corfurl_flu:read(flu_pid(Tail), Epoch, LPN) of
{ok, _} = OK ->
OK;
error_badepoch ->
error_badepoch;
error_trimmed ->
%% TODO: A sanity/should-never-happen check would be to
%% see if everyone else in the chain are also trimmed.
error_trimmed;
error_unwritten ->
%% TODO: During scan_forward(), this pestering of the upstream
%% nodes in the chain is possibly-excessive-work.
%% For now, we'll assume that we always want to repair.
read_repair_chain(Epoch, LPN, Chain)
%% Let it crash: error_overwritten
end.
ok_or_trim(ok) ->
ok;
ok_or_trim(error_trimmed) ->
ok;
ok_or_trim(Else) ->
Else.
read_repair_chain(Epoch, LPN, Chain) ->
try
read_repair_chain1(Epoch, LPN, Chain)
catch
throw:{i_give_up,Res} ->
Res
end.
read_repair_chain1(Epoch, LPN, [Head|Rest] = Chain) ->
?EVENT_LOG({read_repair, LPN, Chain, i_am, self()}),
case corfurl_flu:read(flu_pid(Head), Epoch, LPN) of
{ok, Page} ->
?EVENT_LOG({read_repair, LPN, Head, ok}),
read_repair_chain2(Rest, Epoch, LPN, Page, Chain);
error_badepoch ->
?EVENT_LOG({read_repair, LPN, Head, badepoch}),
error_badepoch;
error_trimmed ->
?EVENT_LOG({read_repair, LPN, Head, trimmed}),
%% TODO: robustify
[begin
?EVENT_LOG({read_repair, LPN, fill, flu_pid(X)}),
ok = case ok_or_trim(corfurl_flu:fill(flu_pid(X), Epoch,
LPN)) of
ok ->
?EVENT_LOG({read_repair, LPN, fill, flu_pid(X), ok}),
ok;
error_overwritten ->
?EVENT_LOG({read_repair, LPN, fill, flu_pid(X), overwritten, try_to_trim}),
Res2 = ok_or_trim(corfurl_flu:trim(
flu_pid(X), Epoch, LPN)),
?EVENT_LOG({read_repair, LPN, fill, flu_pid(X), trim, Res2}),
case Res2 of ok -> ok;
_ -> throw({i_give_up,Res2})
end;
Else ->
%% We're too deeply nested for the current code
%% to deal with, and we're racing. Fine, let
%% our opponent continue. We'll give up, and if
%% the client wants to try again, we can try
%% again from the top.
?EVENT_LOG({read_repair, LPN, fill, flu_pid(X), Else}),
throw({i_give_up,Else})
end
end || X <- Rest],
error_trimmed;
error_unwritten ->
?EVENT_LOG({read_repair, LPN, read, Head, unwritten}),
error_unwritten
%% Let it crash: error_overwritten
end.
read_repair_chain2([] = _Repairees, _Epoch, _LPN, Page, _OriginalChain) ->
?EVENT_LOG({read_repair2, _LPN, finished, {ok, Page}}),
{ok, Page};
read_repair_chain2([RepairFLU|Rest], Epoch, LPN, Page, OriginalChain) ->
case corfurl_flu:write(flu_pid(RepairFLU), Epoch, LPN, Page) of
ok ->
?EVENT_LOG({read_repair2, LPN, write, flu_pid(RepairFLU), ok}),
read_repair_chain2(Rest, Epoch, LPN, Page, OriginalChain);
error_badepoch ->
?EVENT_LOG({read_repair2, LPN, write, flu_pid(RepairFLU), badepoch}),
error_badepoch;
error_trimmed ->
?EVENT_LOG({read_repair2, LPN, write, flu_pid(RepairFLU), trimmed}),
error_trimmed;
error_overwritten ->
?EVENT_LOG({read_repair2, LPN, write, flu_pid(RepairFLU), overwritten}),
%% We're going to do an optional sanity check here.
%% TODO: make the sanity check configurable?
case corfurl_flu:read(flu_pid(RepairFLU), Epoch, LPN) of
{ok, Page2} when Page2 =:= Page ->
?EVENT_LOG({read_repair2, LPN, read, flu_pid(RepairFLU), exact_page}),
%% We're probably going to be racing against someone else
%% that's also doing repair, but so be it.
read_repair_chain2(Rest, Epoch, LPN, Page, OriginalChain);
{ok, _Page2} ->
?EVENT_LOG({read_repair2, LPN, read, flu_pid(RepairFLU), bad_page, _Page2}),
giant_error({bummerbummer, ?MODULE, ?LINE,
sanity_check_failure, lpn, LPN, epoch, Epoch});
error_badepoch ->
?EVENT_LOG({read_repair2, LPN, read, flu_pid(RepairFLU), badepoch}),
error_badepoch;
error_trimmed ->
?EVENT_LOG({read_repair2, LPN, read, flu_pid(RepairFLU), trimmed}),
%% Start repair at the beginning to handle this case
read_repair_chain(Epoch, LPN, OriginalChain)
%% Let it crash: error_overwritten, error_unwritten
end
%% Let it crash: error_unwritten
end.
scan_forward(P, LPN, MaxPages) ->
scan_forward(P, LPN, MaxPages, ok, true, []).
scan_forward(_P, LPN, 0, Status, MoreP, Acc) ->
{Status, LPN, MoreP, lists:reverse(Acc)};
scan_forward(P, LPN, MaxPages, _Status, _MoreP, Acc) ->
case read_page(P, LPN) of
{ok, Page} ->
Res = {LPN, Page},
scan_forward(P, LPN + 1, MaxPages - 1, ok, true, [Res|Acc]);
error_badepoch ->
%% Halt, allow recursion to create our return value.
scan_forward(P, LPN, 0, error_badepoch, false, Acc);
error_trimmed ->
%% TODO: API question, do we add a 'trimmed' indicator
%% in the Acc? Or should the client assume that if
%% scan_forward() doesn't mention a page that
scan_forward(P, LPN + 1, MaxPages - 1, ok, true, Acc);
error_unwritten ->
%% Halt, allow recursion to create our return value.
%% TODO: It's possible that we're stuck here because a client
%% crashed and that we see an unwritten page at LPN.
%% We ought to ask the sequencer always/sometime?? what
%% tail LPN is, and if there's a hole, start a timer to
%% allow us to fill the hole.
scan_forward(P, LPN, 0, ok, false, Acc)
%% Let it crash: error_overwritten
end.
fill_page(#proj{epoch=Epoch} = P, LPN) ->
Chain = project_to_chain(LPN, P),
fill_or_trim_page(Chain, Epoch, LPN, fill).
trim_page(#proj{epoch=Epoch} = P, LPN) ->
Chain = project_to_chain(LPN, P),
fill_or_trim_page(Chain, Epoch, LPN, trim).
fill_or_trim_page([], _Epoch, _LPN, _Func) ->
ok;
fill_or_trim_page([H|T], Epoch, LPN, Func) ->
%% io:format(user, "~s.erl line ~w: TODO: this 'fill or trim' logic is probably stupid, due to mis-remembering the CORFU paper, sorry! Commenting out this warning line is OK, if you wish to proceed with testing Corfurl. This code can change a fill into a trim. Those things are supposed to be separate, silly me, a fill should never automagically change to a trim.\n", [?MODULE, ?LINE]),
case corfurl_flu:Func(flu_pid(H), Epoch, LPN) of
Res when Res == ok; Res == error_trimmed ->
%% Detecting a race here between fills and trims is too crazy,
%% and I don't believe that it *matters*. The ickiest one
%% is a race between Proc A = trim and Proc B = read,
%% chain length of 2 or more:
%% Proc A: trim head -> ok
%% Proc B: read tail -> error_unwritten
%% Proc B: read head -> error_trimmed
%% Proc B: trim tail -> ok
%% Proc A: trim tail -> ??
%%
%% The result that we want that both A & B & any later
%% readers agree that the LPN is trimmed. If the chain is
%% >2, then the procs can win some/all/none of the races
%% to fix up the chain, that's no problem. But don't tell
%% the caller that there was an error during those races.
fill_or_trim_page(T, Epoch, LPN, Func);
Else ->
%% TODO: worth doing anything here, if we're in the middle of chain?
%% TODO: is that ^^ anything different for fill vs. trim?
Else
end.
flu_pid(X) when is_pid(X) ->
X;
flu_pid(X) when is_atom(X) ->
ets:lookup_element(flu_pid_tab, X, 1).
giant_error(Err) ->
io:format(user, "GIANT ERROR: ~p\n", [Err]),
exit(Err).
%%%% %%%% %%%% projection utilities %%%% %%%% %%%%
new_range(Start, End, ChainList) ->
%% TODO: sanity checking of ChainList, Start < End, yadda
#range{pn_start=Start, pn_end=End, chains=list_to_tuple(ChainList)}.
new_simple_projection(Dir, Epoch, Start, End, ChainList) ->
ok = filelib:ensure_dir(Dir ++ "/unused"),
#proj{dir=Dir, epoch=Epoch, r=[new_range(Start, End, ChainList)]}.
make_projection_path(Dir, Epoch) ->
lists:flatten(io_lib:format("~s/~12..0w.proj", [Dir, Epoch])).
read_projection(Dir, Epoch) ->
case file:read_file(make_projection_path(Dir, Epoch)) of
{ok, Bin} ->
{ok, binary_to_term(Bin)}; % TODO if corrupted?
{error, enoent} ->
error_unwritten;
Else ->
Else % TODO API corner case
end.
save_projection(Dir, #proj{epoch=Epoch} = P) ->
Path = make_projection_path(Dir, Epoch),
ok = filelib:ensure_dir(Dir ++ "/ignored"),
{_, B, C} = now(),
TmpPath = Path ++ lists:flatten(io_lib:format(".~w.~w.~w", [B, C, node()])),
%% TODO: don't be lazy, do a flush before link when training wheels come off
ok = file:write_file(TmpPath, term_to_binary(P)),
case file:make_link(TmpPath, Path) of
ok ->
file:delete(TmpPath),
ok;
{error, eexist} ->
error_overwritten;
Else ->
Else % TODO API corner case
end.
latest_projection_epoch_number(Dir) ->
case filelib:wildcard("*.proj", Dir) of
[] ->
-1;
Files ->
{Epoch, _} = string:to_integer(lists:last(Files)),
Epoch
end.
project_to_chain(LPN, P) ->
%% TODO fixme
%% TODO something other than round-robin?
[#range{pn_start=Start, pn_end=End, chains=Chains}] = P#proj.r,
if Start =< LPN, LPN =< End ->
I = ((LPN - Start) rem tuple_size(Chains)) + 1,
element(I, Chains)
end.

View file

@ -0,0 +1,263 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(corfurl_client).
-export([append_page/2, read_page/2, fill_page/2, trim_page/2, scan_forward/3]).
-export([restart_sequencer/1]).
%% For debugging/verification only
-export([pulse_tracing_start/1, pulse_tracing_add/2, pulse_tracing_get/1]).
-include("corfurl.hrl").
-define(LONG_TIME, 5*1000).
%% -define(LONG_TIME, 30*1000).
append_page(Proj, Page) ->
append_page(Proj, Page, 5).
append_page(Proj, _Page, 0) ->
{{error_failed, ?MODULE, ?LINE}, Proj};
append_page(#proj{seq={Sequencer,_,_}} = Proj, Page, Retries) ->
try
{ok, LPN} = corfurl_sequencer:get(Sequencer, 1),
pulse_tracing_add(write, LPN),
append_page1(Proj, LPN, Page, 5)
catch
exit:{Reason,{_gen_server_or_pulse_gen_server,call,[Sequencer|_]}}
when Reason == noproc; Reason == normal ->
append_page(restart_sequencer(Proj), Page, Retries);
exit:Exit ->
{{error_failed, ?MODULE, ?LINE}, incomplete_code, Exit}
end.
append_page1(Proj, _LPN, _Page, 0) ->
{{error_failed, ?MODULE, ?LINE}, Proj};
append_page1(Proj, LPN, Page, Retries) ->
case append_page2(Proj, LPN, Page) of
lost_race ->
append_page(Proj, Page, Retries - 1);
error_badepoch ->
case poll_for_new_epoch_projection(Proj) of
{ok, NewProj} ->
append_page1(NewProj, LPN, Page, Retries - 1);
Else ->
{Else, Proj}
end;
Else ->
{Else, Proj}
end.
append_page2(Proj, LPN, Page) ->
case corfurl:write_page(Proj, LPN, Page) of
ok ->
{ok, LPN};
X when X == error_overwritten; X == error_trimmed ->
report_lost_race(LPN, X),
lost_race;
{special_trimmed, LPN}=XX ->
XX;
error_badepoch=XX->
XX
%% Let it crash: error_unwritten
end.
read_page(Proj, LPN) ->
retry_loop(Proj, fun(P) -> corfurl:read_page(P, LPN) end, 10).
fill_page(Proj, LPN) ->
retry_loop(Proj, fun(P) -> corfurl:fill_page(P, LPN) end, 10).
trim_page(Proj, LPN) ->
retry_loop(Proj, fun(P) -> corfurl:trim_page(P, LPN) end, 10).
scan_forward(Proj, LPN, MaxPages) ->
%% This is fiddly stuff that I'll get 0.7% wrong if I try to be clever.
%% So, do something simple and (I hope) obviously correct.
%% TODO: do something "smarter".
case corfurl:scan_forward(Proj, LPN, MaxPages) of
{error_badepoch, _LPN2, _MoreP, _Pages} = Res ->
case poll_for_new_epoch_projection(Proj) of
{ok, NewProj} ->
{Res, NewProj};
_Else ->
%% TODO: What is the risk of getting caught in a situation
%% where we can never make any forward progress when pages
%% really are being written?
{Res, Proj}
end;
Res ->
{Res, Proj}
end.
%%%%% %%%%% %%%%% %%%%% %%%%% %%%%% %%%%% %%%%% %%%%%
retry_loop(Proj, _Fun, 0) ->
{{error_failed, ?MODULE, ?LINE}, Proj};
retry_loop(Proj, Fun, Retries) ->
case Fun(Proj) of
error_badepoch ->
case poll_for_new_epoch_projection(Proj) of
{ok, NewProj} ->
retry_loop(NewProj, Fun, Retries - 1);
_Else ->
{{error_failed, ?MODULE, ?LINE}, Proj}
end;
Else ->
{Else, Proj}
end.
restart_sequencer(#proj{epoch=Epoch, dir=Dir} = P) ->
case corfurl:latest_projection_epoch_number(Dir) of
N when N > Epoch ->
%% Yay, someone else has intervened. Perhaps they've solved
%% our sequencer problem for us?
read_latest_projection(P);
_ ->
restart_sequencer2(P)
end.
restart_sequencer2(#proj{seq={OldSequencer, _SeqHost, SeqName},
epoch=Epoch, r=Ranges} = P) ->
spawn(fun() ->
(catch corfurl_sequencer:stop(OldSequencer))
end),
TODO_type = standard, % TODO: fix this hard-coding
FLUs = lists:usort(
[FLU || R <- Ranges,
C <- tuple_to_list(R#range.chains), FLU <- C]),
%% TODO: We can proceed if we can seal at least one FLU in
%% each chain. Robustify and sanity check.
[begin
_Res = corfurl_flu:seal(FLU, Epoch)
end || FLU <- lists:reverse(FLUs)],
case corfurl_sequencer:start_link(FLUs, TODO_type, SeqName) of
{ok, Pid} ->
NewP = P#proj{seq={Pid, node(), SeqName}, epoch=Epoch+1},
save_projection_or_get_latest(NewP)
end.
poll_for_new_epoch_projection(P) ->
put(silly_poll_counter, 0),
poll_for_new_epoch_projection(P, get_poll_retries()).
poll_for_new_epoch_projection(P, 0) ->
%% TODO: The client that caused the seal may have crashed before
%% writing a new projection. We should try to pick up here,
%% write a new projection, and bully forward.
%% NOTE: When that new logic is added, the huge polling interval
%% that PULSE uses should be reduced to something tiny.
case corfurl:latest_projection_epoch_number(P#proj.dir) of
Neg when Neg < 0 ->
error_badepoch;
Other ->
exit({bummer, ?MODULE, ?LINE, latest_epoch, Other})
end;
poll_for_new_epoch_projection(#proj{dir=Dir, epoch=Epoch} = P, Tries) ->
case corfurl:latest_projection_epoch_number(Dir) of
NewEpoch when NewEpoch > Epoch ->
corfurl:read_projection(Dir, NewEpoch);
_ ->
timer:sleep(get_poll_sleep_time()),
case put(silly_poll_counter, get(silly_poll_counter) + 1) div 10*1000 of
0 -> io:format(user, "P", []);
_ -> ok
end,
poll_for_new_epoch_projection(P, Tries - 1)
end.
save_projection_or_get_latest(#proj{dir=Dir} = P) ->
case corfurl:save_projection(Dir, P) of
ok ->
P;
error_overwritten ->
read_latest_projection(P)
end.
read_latest_projection(#proj{dir=Dir}) ->
NewEpoch = corfurl:latest_projection_epoch_number(Dir),
{ok, NewP} = corfurl:read_projection(Dir, NewEpoch),
NewP.
-ifdef(TEST).
-ifdef(PULSE).
report_lost_race(_LPN, _Reason) ->
%% It's interesting (sometime?) to know if a page was overwritten
%% because the sequencer was configured by QuickCheck to hand out
%% duplicate LPNs. If this gets too annoying, this can be a no-op
%% function.
io:format(user, "o", []).
-else. % PULSE
report_lost_race(LPN, Reason) ->
io:format(user, "LPN ~p race lost: ~p\n", [LPN, Reason]).
-endif. % PULSE
-else. % TEST
report_lost_race(LPN, Reason) ->
%% Perhaps it's an interesting event, but the rest of the system
%% should react correctly whenever this happens, so it shouldn't
%% ever cause an external consistency problem.
error_logger:debug_msg("LPN ~p race lost: ~p\n", [LPN, Reason]).
-endif. % TEST
-ifdef(PULSE).
get_poll_retries() ->
999*1000.
get_poll_sleep_time() ->
1.
-else.
get_poll_retries() ->
25.
get_poll_sleep_time() ->
50.
-endif.
-ifdef(PULSE).
pulse_tracing_start(Type) ->
put({?MODULE, Type}, []).
pulse_tracing_add(Type, Stuff) ->
List = case pulse_tracing_get(Type) of
undefined -> [];
L -> L
end,
put({?MODULE, Type}, [Stuff|List]).
pulse_tracing_get(Type) ->
get({?MODULE, Type}).
-else.
pulse_tracing_start(_Type) ->
ok.
pulse_tracing_add(_Type, _Stuff) ->
ok.
pulse_tracing_get(_Type) ->
ok.
-endif.

View file

@ -0,0 +1,467 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(corfurl_flu).
-behaviour(gen_server).
-type flu_error() :: 'error_badepoch' | 'error_trimmed' |
'error_overwritten' | 'error_unwritten'.
-export_type([flu_error/0]).
%% API
-export([start_link/1, start_link/3, status/1, stop/1]).
-export([write/4, read/3, seal/2, trim/3, fill/3]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-include("corfurl.hrl").
-ifdef(TEST).
-export([get__mlp/1, get__min_epoch/1, get__trim_watermark/1]).
-compile(export_all).
-ifdef(PULSE).
-compile({parse_transform, pulse_instrument}).
-compile({pulse_skip,[{msc, 3}]}).
-endif.
-endif.
-include_lib("kernel/include/file.hrl").
%%% Debugging: for extra events in the PULSE event log, use the 2nd statement.
-define(EVENT_LOG(X), ok).
%% -define(EVENT_LOG(X), event_logger:event(X)).
-record(state, {
dir :: string(),
mem_fh :: term(),
min_epoch :: non_neg_integer(),
page_size :: non_neg_integer(),
max_mem :: non_neg_integer(),
max_logical_page :: 'unknown' | non_neg_integer(),
%% TODO: Trim watermark handling is *INCOMPLETE*. The
%% current code is broken but is occasionally correct,
%% like a broken analog watch is correct 2x per day.
trim_watermark :: non_neg_integer(),
trim_count :: non_neg_integer()
}).
start_link(Dir) ->
start_link(Dir, 8, 64*1024*1024).
start_link(Dir, PageSize, MaxMem) ->
gen_server:start_link(?MODULE, {Dir, PageSize, MaxMem}, []).
status(Pid) ->
gen_server:call(Pid, status, infinity).
stop(Pid) ->
gen_server:call(Pid, stop, infinity).
write(Pid, Epoch, LogicalPN, PageBin)
when is_integer(LogicalPN), LogicalPN > 0, is_binary(PageBin) ->
g_call(Pid, {write, Epoch, LogicalPN, PageBin}, infinity).
read(Pid, Epoch, LogicalPN)
when is_integer(Epoch), Epoch > 0, is_integer(LogicalPN), LogicalPN > 0 ->
g_call(Pid, {read, Epoch, LogicalPN}, infinity).
seal(Pid, Epoch) when is_integer(Epoch), Epoch > 0 ->
g_call(Pid, {seal, Epoch}, infinity).
trim(Pid, Epoch, LogicalPN)
when is_integer(Epoch), Epoch > 0, is_integer(LogicalPN), LogicalPN > 0 ->
g_call(Pid, {trim, Epoch, LogicalPN}, infinity).
fill(Pid, Epoch, LogicalPN)
when is_integer(Epoch), Epoch > 0, is_integer(LogicalPN), LogicalPN > 0 ->
Res = g_call(Pid, {fill, Epoch, LogicalPN}, infinity),
undo_special_pulse_test_result(Res).
g_call(Pid, Arg, Timeout) ->
LC1 = lclock_get(),
msc(self(), Pid, Arg),
{Res, LC2} = gen_server:call(Pid, {Arg, LC1}, Timeout),
msc(Pid, self(), Res),
lclock_update(LC2),
Res.
-ifdef(TEST).
get__mlp(Pid) ->
gen_server:call(Pid, get__mlp, infinity).
get__min_epoch(Pid) ->
gen_server:call(Pid, get__min_epoch, infinity).
get__trim_watermark(Pid) ->
gen_server:call(Pid, get__trim_watermark, infinity).
-endif. % TEST
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
init({Dir, ExpPageSize, ExpMaxMem}) ->
lclock_init(),
MemFile = memfile_path(Dir),
filelib:ensure_dir(MemFile),
{ok, FH} = file:open(MemFile, [read, write, raw, binary]),
{_Version, MinEpoch, PageSize, MaxMem, TrimWatermark} =
try
Res = read_hard_state(Dir),
case Res of
{_V, _LE, PS, MM, TW}
when PS =:= ExpPageSize, MM =:= ExpMaxMem ->
Res
end
catch
X:Y ->
if X == error,
Y == {case_clause,{error,enoent}} ->
ok;
true ->
%% TODO: log-ify this
io:format("init: caught ~p ~p @ ~p\n",
[X, Y, erlang:get_stacktrace()])
end,
{no_version_number, 0, ExpPageSize, ExpMaxMem, 0}
end,
State = #state{dir=Dir, mem_fh=FH, min_epoch=MinEpoch, page_size=PageSize,
max_mem=MaxMem, max_logical_page=unknown,
trim_watermark=TrimWatermark, trim_count=0},
self() ! finish_init, % TODO
{ok, State}.
handle_call(Call, From, #state{max_logical_page=unknown} = State) ->
{noreply, NewState} = handle_info(finish_init, State),
handle_call(Call, From, NewState);
handle_call({{write, ClientEpoch, _LogicalPN, _PageBin}, LC1}, _From,
#state{min_epoch=MinEpoch} = State)
when ClientEpoch < MinEpoch ->
LC2 = lclock_update(LC1),
{reply, {error_badepoch, LC2}, State};
handle_call({{write, _ClientEpoch, LogicalPN, PageBin}, LC1}, _From,
#state{max_logical_page=MLPN} = State) ->
LC2 = lclock_update(LC1),
case check_write(LogicalPN, PageBin, State) of
{ok, Offset} ->
ok = write_page(Offset, LogicalPN, PageBin, State),
NewMLPN = erlang:max(LogicalPN, MLPN),
?EVENT_LOG({flu, write, self(), LogicalPN, ok}),
{reply, {ok, LC2}, State#state{max_logical_page=NewMLPN}};
Else ->
?EVENT_LOG({flu, write, self(), LogicalPN, Else}),
{reply, {Else, LC2}, State}
end;
handle_call({{read, ClientEpoch, _LogicalPN}, LC1}, _From,
#state{min_epoch=MinEpoch} = State)
when ClientEpoch < MinEpoch ->
LC2 = lclock_update(LC1),
{reply, {error_badepoch, LC2}, State};
handle_call({{read, _ClientEpoch, LogicalPN}, LC1}, _From, State) ->
LC2 = lclock_update(LC1),
Reply = read_page(LogicalPN, State),
?EVENT_LOG({flu, read, self(), LogicalPN, Reply}),
{reply, {Reply, LC2}, State};
handle_call({{seal, ClientEpoch}, LC1}, _From, #state{min_epoch=MinEpoch} = State)
when ClientEpoch < MinEpoch ->
LC2 = lclock_update(LC1),
{reply, {error_badepoch, LC2}, State};
handle_call({{seal, ClientEpoch}, LC1}, _From, #state{max_logical_page=MLPN}=State) ->
LC2 = lclock_update(LC1),
NewState = State#state{min_epoch=ClientEpoch+1},
ok = write_hard_state(NewState),
{reply, {{ok, MLPN}, LC2}, NewState};
handle_call({{trim, ClientEpoch, _LogicalPN}, LC1}, _From,
#state{min_epoch=MinEpoch} = State)
when ClientEpoch < MinEpoch ->
LC2 = lclock_update(LC1),
{reply, {error_badepoch, LC2}, State};
handle_call({{trim, _ClientEpoch, LogicalPN}, LC1}, _From, State) ->
LC2 = lclock_update(LC1),
{Reply, NewState} = do_trim_or_fill(trim, LogicalPN, State),
?EVENT_LOG({flu, trim, self(), LogicalPN, Reply}),
{reply, {Reply, LC2}, NewState};
handle_call({{fill, ClientEpoch, _LogicalPN}, LC1}, _From,
#state{min_epoch=MinEpoch} = State)
when ClientEpoch < MinEpoch ->
LC2 = lclock_update(LC1),
{reply, {error_badepoch, LC2}, State};
handle_call({{fill, _ClientEpoch, LogicalPN}, LC1}, _From, State) ->
LC2 = lclock_update(LC1),
io:format(user, "~s.erl line ~w: TODO: this 'fill or trim' logic is probably stupid, due to mis-remembering the CORFU paper, sorry! Commenting out this warning line is OK, if you wish to proceed with testing Corfurl. This code can change a fill into a trim. Those things are supposed to be separate, silly me, a fill should never automagically change to a trim.\n", [?MODULE, ?LINE]),
{Reply, NewState} = do_trim_or_fill(fill, LogicalPN, State),
?EVENT_LOG({flu, fill, self(), LogicalPN, Reply}),
{reply, {Reply, LC2}, NewState};
handle_call(get__mlp, _From, State) ->
{reply, State#state.max_logical_page, State};
handle_call(get__min_epoch, _From, State) ->
{reply, State#state.min_epoch, State};
handle_call(get__trim_watermark, _From, State) ->
{reply, State#state.trim_watermark, State};
handle_call(status, _From, State) ->
L = [{min_epoch, State#state.min_epoch},
{page_size, State#state.page_size},
{max_mem, State#state.max_mem},
{max_logical_page, State#state.max_logical_page},
{trim_watermark, State#state.trim_watermark}],
{reply, {ok, L}, State};
handle_call(stop, _From, State) ->
{stop, normal, ok, State};
handle_call(Request, _From, State) ->
Reply = {whaaaaaaaaaaaaaaaaaa, Request},
{reply, Reply, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(finish_init, State) ->
MLP = find_max_logical_page(State),
State2 = State#state{max_logical_page=MLP},
ok = write_hard_state(State2),
{noreply, State2};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, State) ->
ok = write_hard_state(State),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
read_hard_state(Dir) ->
File = hard_state_path(Dir),
case file:read_file(File) of
{ok, Bin} ->
case binary_to_term(Bin) of
T when element(1, T) == v1 ->
T
end;
Else ->
Else
end.
write_hard_state(#state{min_epoch=MinEpoch, page_size=PageSize, max_mem=MaxMem,
trim_watermark=TrimWatermark} = S) ->
NewPath = hard_state_path(S#state.dir),
TmpPath = NewPath ++ ".tmp",
{ok, FH} = file:open(TmpPath, [write, binary, raw]),
HS = {v1, MinEpoch, PageSize, MaxMem, TrimWatermark},
ok = file:write(FH, term_to_binary(HS)),
%% ok = file:sync(FH), % TODO uncomment when the training wheels come off
ok = file:close(FH),
ok = file:rename(TmpPath, NewPath).
memfile_path(Dir) ->
Dir ++ "/memfile".
hard_state_path(Dir) ->
Dir ++ "/hard-state".
calc_page_offset(PhysicalPN, #state{page_size=PageSize}) ->
TotalSize = ?PAGE_OVERHEAD + PageSize,
PhysicalPN * TotalSize.
%% find_max_logical_page(): This is a kludge, based on our naive
%% implementation of not keeping the maximum logical page in hard
%% state.
find_max_logical_page(S) ->
{ok, FI} = file:read_file_info(memfile_path(S#state.dir)),
find_max_logical_page(0, 0, FI#file_info.size, S).
find_max_logical_page(MLP, PhysicalPN, FSize,
#state{mem_fh=FH, max_mem=MaxMem}=S) ->
Offset = calc_page_offset(PhysicalPN, S),
if Offset < MaxMem, Offset < FSize ->
case file:pread(FH, Offset, 9) of
{ok, <<1:8/big, LP:64/big>>} ->
find_max_logical_page(erlang:max(MLP, LP), PhysicalPN + 1,
FSize, S);
_ ->
find_max_logical_page(MLP, PhysicalPN + 1, FSize, S)
end;
true ->
MLP
end.
check_write(LogicalPN, PageBin,
#state{max_mem=MaxMem, page_size=PageSize} = S) ->
Offset = calc_page_offset(LogicalPN, S),
if Offset < MaxMem, byte_size(PageBin) =:= PageSize ->
case check_is_written(Offset, LogicalPN, S) of
false ->
{ok, Offset};
true ->
error_overwritten
end;
true ->
{bummer, ?MODULE, ?LINE, lpn, LogicalPN, offset, Offset, max_mem, MaxMem, page_size, PageSize}
end.
check_is_written(Offset, _PhysicalPN, #state{mem_fh=FH}) ->
case file:pread(FH, Offset, 1) of
{ok, <<0:8>>} ->
false;
{ok, <<1:8>>} -> % written
true;
{ok, <<2:8>>} -> % trimmed
true;
eof ->
%% We assume that Offset has been bounds-checked
false
end.
write_page(Offset, LogicalPN, PageBin, #state{mem_fh=FH}) ->
IOList = [<<1:8>>, <<LogicalPN:64/big>>, PageBin, <<1:8>>],
ok = file:pwrite(FH, Offset, IOList).
read_page(LogicalPN, #state{max_mem=MaxMem, mem_fh=FH,
page_size=PageSize} = S) ->
Offset = calc_page_offset(LogicalPN, S),
if Offset < MaxMem ->
case file:pread(FH, Offset, PageSize + ?PAGE_OVERHEAD) of
{ok, <<1:8, LogicalPN:64/big, Page:PageSize/binary, 1:8>>} ->
{ok, Page};
{ok, <<1:8, _LogicalPN:64/big, _:PageSize/binary, 0:8>>} ->
io:format("BUMMER: ~s line ~w: incomplete write at ~p\n",
[?MODULE, ?LINE, LogicalPN]),
error_unwritten;
{ok, <<2:8, _/binary>>} ->
error_trimmed;
{ok, _} ->
error_unwritten;
eof ->
error_unwritten;
Else ->
io:format("BUMMER: ~s line ~w: ~p\n",
[?MODULE, ?LINE, Else]),
badarg % TODO: better idea
end;
true ->
badarg
end.
do_trim_or_fill(Op, LogicalPN,
#state{trim_watermark=TrimWatermark, trim_count=TrimCount} = S) ->
case trim_page(Op, LogicalPN, S) of
ok ->
NewS = S#state{trim_watermark=erlang:max(
TrimWatermark, LogicalPN),
trim_count=TrimCount + 1},
if TrimCount rem 1000 == 0 ->
ok = write_hard_state(NewS);
true ->
ok
end,
{ok, NewS};
Else ->
{Else, S}
end.
trim_page(Op, LogicalPN, #state{max_mem=MaxMem, mem_fh=FH} = S) ->
Offset = calc_page_offset(LogicalPN, S),
if Offset < MaxMem ->
Status = case file:pread(FH, Offset, 1) of
{ok, <<0:8>>} ->
error_unwritten;
{ok, <<1:8>>} ->
error_overwritten;
{ok, <<2:8>>} ->
error_trimmed;
eof ->
error_unwritten;
Else ->
io:format("BUMMER: ~s line ~w: ~p\n",
[?MODULE, ?LINE, Else]),
error_trimmed % TODO
end,
if Status == error_overwritten andalso Op == trim ->
ok = file:pwrite(FH, Offset, <<2:8>>),
ok;
Status == error_unwritten andalso Op == fill ->
ok = file:pwrite(FH, Offset, <<2:8>>),
ok;
true ->
Status
end;
true ->
badarg
end.
-ifdef(PULSE).
%% We do *not* want to remove any special PULSE return code.
undo_special_pulse_test_result(Res) ->
Res.
-else. % PULSE
undo_special_pulse_test_result({special_trimmed, LPN}) ->
{ok, LPN};
undo_special_pulse_test_result(Res) ->
Res.
-endif. % PULSE
-ifdef(PULSE_HACKING).
%% Create a trace file that can be formatted by "mscgen" utility.
%% Lots of hand-editing is required after creating the file, sorry!
msc(_From, _To, _Tag) ->
{ok, FH} = file:open("/tmp/goo", [write, append]),
io:format(FH, " \"~w\" -> \"~w\" [ label = \"~w\" ] ;\n", [_From, _To, _Tag]),
file:close(FH).
-else. % PULSE_HACKING
msc(_From, _To, _Tag) ->
ok.
-endif. % PULSE_HACkING
-ifdef(PULSE).
lclock_init() ->
lamport_clock:init().
lclock_get() ->
lamport_clock:get().
lclock_update(LC) ->
lamport_clock:update(LC).
-else. % PULSE
lclock_init() ->
ok.
lclock_get() ->
ok.
lclock_update(_LC) ->
ok.
-endif. % PLUSE

View file

@ -0,0 +1,154 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(corfurl_sequencer).
-behaviour(gen_server).
-export([start_link/1, stop/1, stop/2,
get/2]).
-ifdef(TEST).
-export([start_link/2]).
-compile(export_all).
-endif.
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-ifdef(PULSE).
-compile({parse_transform, pulse_instrument}).
-endif.
-endif.
-define(SERVER, ?MODULE).
%% -define(LONG_TIME, 30*1000).
-define(LONG_TIME, 5*1000).
start_link(FLUs) ->
start_link(FLUs, standard).
start_link(FLUs, SeqType) ->
start_link(FLUs, SeqType, ?SERVER).
start_link(FLUs, SeqType, RegName) ->
case gen_server:start_link({local, RegName}, ?MODULE, {FLUs, SeqType},[]) of
{ok, Pid} ->
{ok, Pid};
{error, {already_started, Pid}} ->
{ok, Pid};
Else ->
Else
end.
stop(Pid) ->
stop(Pid, stop).
stop(Pid, Method) ->
Res = gen_server:call(Pid, stop, infinity),
if Method == kill ->
%% Emulate gen.erl's client-side behavior when the server process
%% is killed.
exit(killed);
true ->
Res
end.
get(Pid, NumPages) ->
{LPN, LC} = gen_server:call(Pid, {get, NumPages, lclock_get()}, ?LONG_TIME),
lclock_update(LC),
LPN.
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
init({FLUs, TypeOrSeed}) ->
lclock_init(),
MLP = get_max_logical_page(FLUs),
if TypeOrSeed == standard ->
{ok, MLP + 1};
true ->
{Seed, BadPercent, MaxDifference} = TypeOrSeed,
random:seed(Seed),
{ok, {MLP+1, BadPercent, MaxDifference}}
end.
handle_call({get, NumPages, LC}, _From, MLP) when is_integer(MLP) ->
NewLC = lclock_update(LC),
{reply, {{ok, MLP}, NewLC}, MLP + NumPages};
handle_call({get, NumPages, LC}, _From, {MLP, BadPercent, MaxDifference}) ->
NewLC = lclock_update(LC),
Fudge = case random:uniform(100) of
N when N < BadPercent ->
random:uniform(MaxDifference * 2) - MaxDifference;
_ ->
0
end,
{reply, {{ok, erlang:max(1, MLP + Fudge)}, NewLC},
{MLP + NumPages, BadPercent, MaxDifference}};
handle_call(stop, _From, MLP) ->
{stop, normal, ok, MLP};
handle_call(_Request, _From, MLP) ->
Reply = whaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa,
{reply, Reply, MLP}.
handle_cast(_Msg, MLP) ->
{noreply, MLP}.
handle_info(_Info, MLP) ->
{noreply, MLP}.
terminate(_Reason, _MLP) ->
ok.
code_change(_OldVsn, MLP, _Extra) ->
{ok, MLP}.
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
get_max_logical_page(FLUs) ->
lists:max([proplists:get_value(max_logical_page, Ps, 0) ||
FLU <- FLUs,
{ok, Ps} <- [corfurl_flu:status(FLU)]]).
-ifdef(PULSE).
lclock_init() ->
lamport_clock:init().
lclock_get() ->
lamport_clock:get().
lclock_update(LC) ->
lamport_clock:update(LC).
-else. % PULSE
lclock_init() ->
ok.
lclock_get() ->
ok.
lclock_update(_LC) ->
ok.
-endif. % PLUSE

View file

@ -0,0 +1,40 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(corfurl_util).
-export([delete_dir/1]).
-ifdef(PULSE).
-compile({parse_transform, pulse_instrument}).
-endif.
delete_dir(Dir) ->
%% We don't recursively delete directories, the ok pattern match will fail.
[ok = file:delete(X) || X <- filelib:wildcard(Dir ++ "/*")],
case file:del_dir(Dir) of
ok ->
ok;
{error, enoent} ->
ok;
Else ->
Else
end.

View file

@ -0,0 +1,135 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(corfurl_flu_test).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-endif.
-include("corfurl.hrl").
-define(M, corfurl_flu).
-ifdef(TEST).
-ifndef(PULSE).
startstop_test() ->
Dir = "/tmp/flu." ++ os:getpid(),
{ok, P1} = ?M:start_link(Dir),
try
{ok, _} = ?M:status(P1),
ok = ?M:stop(P1),
{'EXIT', _} = (catch ?M:stop(P1)),
{ok, P2} = ?M:start_link(Dir),
0 = ?M:get__mlp(P2),
0 = ?M:get__min_epoch(P2),
ok = ?M:stop(P2),
ok
after
ok = corfurl_util:delete_dir(Dir)
end.
basic_test() ->
Dir = "/tmp/flu." ++ os:getpid(),
{ok, P1} = ?M:start_link(Dir),
try
Epoch1 = 1,
Epoch2 = 2,
Epoch3 = 3,
LPN = 1,
Bin1 = <<42:64>>,
Bin2 = <<42042:64>>,
error_unwritten = ?M:read(P1, Epoch1, LPN),
error_unwritten = ?M:trim(P1, Epoch1, LPN),
error_unwritten = ?M:trim(P1, Epoch1, LPN+77),
ok = ?M:write(P1, Epoch1, LPN, Bin1),
error_overwritten = ?M:write(P1, Epoch1, LPN, Bin1),
error_overwritten = ?M:fill(P1, Epoch1, LPN),
LPN = ?M:get__mlp(P1),
0 = ?M:get__min_epoch(P1),
0 = ?M:get__trim_watermark(P1),
{ok, LPN} = ?M:seal(P1, Epoch1),
2 = ?M:get__min_epoch(P1),
error_overwritten = ?M:write(P1, Epoch2, LPN, Bin1),
ok = ?M:write(P1, Epoch2, LPN+1, Bin2),
Epoch2 = ?M:get__min_epoch(P1),
error_badepoch = ?M:read(P1, Epoch1, LPN),
{ok, Bin2} = ?M:read(P1, Epoch2, LPN+1),
error_unwritten = ?M:read(P1, Epoch2, LPN+2),
badarg = ?M:read(P1, Epoch2, 1 bsl 2982),
error_badepoch = ?M:seal(P1, Epoch1),
{ok, _} = ?M:seal(P1, Epoch2),
error_badepoch = ?M:seal(P1, Epoch2),
error_badepoch = ?M:read(P1, Epoch1, LPN),
error_badepoch = ?M:read(P1, Epoch1, LPN+1),
{ok, Bin1} = ?M:read(P1, Epoch3, LPN),
{ok, Bin2} = ?M:read(P1, Epoch3, LPN+1),
error_badepoch = ?M:trim(P1, Epoch1, LPN+1),
ok = ?M:trim(P1, Epoch3, LPN+1),
error_trimmed = ?M:trim(P1, Epoch3, LPN+1),
%% Current watermark processing is broken. But we'll test what's
%% there now.
ExpectedWaterFixMe = LPN+1,
ExpectedWaterFixMe = ?M:get__trim_watermark(P1),
ok = ?M:fill(P1, Epoch3, LPN+3),
error_trimmed = ?M:read(P1, Epoch3, LPN+3),
error_trimmed = ?M:fill(P1, Epoch3, LPN+3),
error_trimmed = ?M:trim(P1, Epoch3, LPN+3),
Epoch3 = ?M:get__min_epoch(P1),
ok = ?M:stop(P1),
ok
after
ok = corfurl_util:delete_dir(Dir)
end.
seal_persistence_test() ->
Dir = "/tmp/flu." ++ os:getpid(),
{ok, P1} = ?M:start_link(Dir),
try
0 = ?M:get__min_epoch(P1),
Epoch = 665,
{ok, LPN} = ?M:seal(P1, Epoch-1),
Epoch = ?M:get__min_epoch(P1),
ok = ?M:stop(P1),
{ok, P2} = ?M:start_link(Dir),
Epoch = ?M:get__min_epoch(P2),
ok = ?M:stop(P2),
ok
after
ok = corfurl_util:delete_dir(Dir)
end.
-endif. % not PULSE
-endif. % TEST

View file

@ -0,0 +1,950 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(corfurl_pulse).
-ifdef(TEST).
-ifdef(PULSE).
-compile(export_all).
-include_lib("eqc/include/eqc.hrl").
-include_lib("eqc/include/eqc_statem.hrl").
-include("corfurl.hrl").
-include_lib("eunit/include/eunit.hrl").
-compile({parse_transform, pulse_instrument}).
-compile({pulse_skip,[{prop_pulse_test_,0},{clean_up_runtime,1},{delete_dir,1}]}).
%% -compile({pulse_no_side_effect,[{file,'_','_'}, {erlang, now, 0}]}).
%% Used for output within EUnit...
-define(QC_FMT(Fmt, Args),
io:format(user, Fmt, Args)).
%% And to force EUnit to output QuickCheck output...
-define(QC_OUT(P),
eqc:on_output(fun(Str, Args) -> ?QC_FMT(Str, Args) end, P)).
-define(MAX_PAGES, 50000).
-define(MY_TAB, i_have_a_name).
-define(MY_KEY, ?MY_TAB).
-define(PROJECTION_DIR, "./tmp.projection." ++ os:getpid()).
-define(SEQUENCER_NAME, 'corfurl pulse seq thingie').
-record(run, {
proj, % Projection
flus % List of FLUs
}).
-record(state, {
is_setup = false :: boolean(),
num_chains = 0 :: integer(),
chain_len = 0 :: integer(),
page_size = 0 :: integer(),
run :: #run{}
}).
%% Model testing things:
%% Define true to fake bad behavior that model **must** notice & fail!
-ifndef(TRIP_no_append_duplicates).
-define(TRIP_no_append_duplicates, false).
-endif.
-ifndef(TRIP_bad_read).
-define(TRIP_bad_read, false).
-endif.
-ifndef(TRIP_bad_scan_forward).
-define(TRIP_bad_scan_forward, false).
-endif.
-ifndef(TRIP_bad_fill).
-define(TRIP_bad_fill, false).
-endif.
-ifndef(TRIP_bad_trim).
-define(TRIP_bad_trim, false).
-endif.
initial_state() ->
#state{}.
gen_page(PageSize) ->
binary(PageSize).
gen_seed() ->
noshrink({choose(1, 20000), choose(1, 20000), choose(1, 20000)}).
gen_sequencer_percent() ->
frequency([{10, choose(1,100)},
{5, choose(90,100)}]).
gen_sequencer() ->
frequency([{100, standard},
{50, {gen_seed(), gen_sequencer_percent(), choose(1, 2)}}]).
gen_approx_page() ->
%% EQC can't know what pages are perhaps-written, so pick something big.
noshrink(?LET(I, largeint(), abs(I))).
gen_scan_forward_start() ->
oneof([1, gen_approx_page()]).
gen_stop_method() ->
oneof([stop, kill]).
command(#state{run=Run} = S) ->
?LET({NumChains, ChainLen, PageSize},
{parameter(num_chains), parameter(chain_len), parameter(page_size)},
frequency(
[{50, {call, ?MODULE, setup, [NumChains, ChainLen, PageSize, gen_sequencer()]}}
|| not S#state.is_setup] ++
[{50, {call, ?MODULE, append, [Run, gen_page(PageSize)]}}
|| S#state.is_setup] ++
[{15, {call, ?MODULE, read_approx, [Run, gen_approx_page()]}}
|| S#state.is_setup] ++
[{15, {call, ?MODULE, scan_forward, [Run, gen_scan_forward_start(), nat()]}}
|| S#state.is_setup] ++
[{12, {call, ?MODULE, fill, [Run, gen_approx_page()]}}
|| S#state.is_setup] ++
[{12, {call, ?MODULE, trim, [Run, gen_approx_page()]}}
|| S#state.is_setup] ++
[{10, {call, ?MODULE, stop_sequencer, [Run, gen_stop_method()]}}
|| S#state.is_setup] ++
[])).
%% Precondition, checked before a command is added to the command sequence.
precondition(S, {call, _, setup, _}) ->
not S#state.is_setup;
precondition(S, {call, _, _, _}) ->
S#state.is_setup.
%% Next state transformation, S is the current state and V is the result of the
%% command.
next_state(S, Res, {call, _, setup, [NumChains, ChainLen, PageSize, _SeqType]}) ->
S#state{is_setup=true,
num_chains=NumChains,
chain_len=ChainLen,
page_size=PageSize,
run=Res};
next_state(S, _, {call, _, append, _}) ->
S;
next_state(S, _, {call, _, read_approx, _}) ->
S;
next_state(S, _, {call, _, scan_forward, _}) ->
S;
next_state(S, _, {call, _, fill, _}) ->
S;
next_state(S, _, {call, _, trim, _}) ->
S;
next_state(S, _, {call, _, stop_sequencer, _}) ->
S.
eqeq(X, X) -> true;
eqeq(X, Y) -> {X, '/=', Y}.
postcondition(_S, {call, _, setup, _}, #run{} = _V) ->
true;
postcondition(_S, {call, _, append, _}, V) ->
case V of
{ok, LPN} when is_integer(LPN) -> true;
{special_trimmed, LPN} when is_integer(LPN) -> true;
error_badepoch -> true;
_ -> eqeq(V, todoTODO_fixit)
end;
postcondition(_S, {call, _, read_approx, _}, V) ->
valid_read_result(V);
postcondition(_S, {call, _, scan_forward, _}, V) ->
case V of
{ok, LastLSN, MoreP, Pages} ->
true = is_integer(LastLSN),
true = LastLSN > 0,
true = (MoreP == true orelse MoreP == false),
[] = lists:usort([X || {_LPN, Pg} <- Pages,
X <- [valid_read_result(Pg)], X /= true]),
true;
_ ->
eqeq(V, {todoTODO_fixit,?LINE})
end;
postcondition(_S, {call, _, FillTrim, _}, V)
when FillTrim == fill; FillTrim == trim ->
case V of
ok -> true;
error_trimmed -> true;
error_unwritten -> true;
error_overwritten -> true;
_ -> eqeq(V, {error, FillTrim, V})
end;
postcondition(_S, {call, _, stop_sequencer, _}, _V) ->
true.
valid_read_result(Pg) when is_binary(Pg) -> true;
valid_read_result(error_unwritten) -> true;
valid_read_result(error_trimmed) -> true;
valid_read_result(V) -> eqeq(V, {todoTODO_fixit,?LINE}).
run_commands_on_node(LocalOrSlave, Cmds, Seed) ->
AfterTime = if LocalOrSlave == local -> 50000;
LocalOrSlave == slave -> 1000000
end,
event_logger:start_link(),
pulse:start(),
delete_dir(?PROJECTION_DIR),
error_logger:tty(false),
error_logger:add_report_handler(handle_errors),
event_logger:start_logging(),
X =
try
{H, S, Res, Trace} = pulse:run(fun() ->
catch ets:new(?MY_TAB, [public, set, named_table]),
ets:insert(?MY_TAB, {?MY_KEY, undefined}),
%% application:start(my_test_app),
%% receive after AfterTime -> ok end,
{H, S, R} = run_parallel_commands(?MODULE, Cmds),
%% io:format(user, "Yooo: H = ~p\n", [H]),
%% io:format(user, "Yooo: S = ~p\n", [S]),
%% io:format(user, "Yooo: R = ~p\n", [R]),
receive after AfterTime -> ok end,
Trace = event_logger:get_events(),
%% receive after AfterTime -> ok end,
[{_, ThisRun}] = ets:lookup(?MY_TAB, ?MY_KEY),
[clean_up_runtime(ThisRun) || ThisRun /= undefined],
%% stop pulse controller *after* clean_up_runtime().
catch exit(pulse_application_controller, shutdown),
{H, S, R, Trace}
end, [{seed, Seed},
{strategy, unfair}]),
Schedule = pulse:get_schedule(),
Errors = gen_event:call(error_logger, handle_errors, get_errors, 60*1000),
{H, S, Res, Trace, Schedule, Errors}
catch
_:Err ->
{'EXIT', Err}
end,
X.
prop_pulse() ->
prop_pulse(local).
prop_pulse(LocalOrSlave) ->
?FORALL({NumChains, ChainLen, PageSize},
{choose(1, 3), choose(1, 3), choose(1, 16)},
begin
P = ?FORALL({Cmds, Seed},
{with_parameters([{num_chains, NumChains},
{chain_len, ChainLen},
{page_size, PageSize}], parallel_commands(?MODULE)),
pulse:seed()},
begin
case run_commands_on_node(LocalOrSlave, Cmds, Seed) of
{'EXIT', Err} ->
equals({'EXIT', Err}, ok);
{_H, S, Res, Trace, Schedule, Errors} ->
CheckTrace = check_trace(Trace, Cmds, Seed),
?WHENFAIL(
S = S, % ?QC_FMT("\nState: ~p\n", [S]),
measure(schedule, length(Schedule),
conjunction(
[{simple_result, equals(Res, ok)},
{errors, equals(Errors, [])},
{events, CheckTrace} ])))
end
end),
P
end).
prop_pulse_test_() ->
Timeout = case os:getenv("PULSE_TIME") of
false -> 60;
Val -> list_to_integer(Val)
end,
ExtraTO = case os:getenv("PULSE_SHRINK_TIME") of
false -> 0;
Val2 -> list_to_integer(Val2)
end,
io:format(user, "prop_pulse_test time: ~p + ~p seconds\n",
[Timeout, ExtraTO]),
{timeout, (Timeout+ExtraTO) + 60,
fun() ->
?assert(eqc:quickcheck(eqc:testing_time(Timeout,?QC_OUT(prop_pulse()))))
end}.
%% Example Trace0 (raw event info, from the ?LOG macro)
%%
%% [{32014,{call,<0.467.0>,{append,<<"O">>}}},
%% {32421,{call,<0.466.0>,{append,<<134>>}}},
%% {44522,{result,<0.467.0>,{ok,1}}},
%% {47651,{result,<0.466.0>,{ok,2}}}]
check_trace(Trace0, _Cmds, _Seed) ->
%% Let's treat this thing like a KV store. It is, mostly.
%% Key = LPN, Value = error_unwritten | {ok, Blob} | error_trimmed
%%
%% Problem: At {call, Pid, ...} time, we don't know what Key is!
%% We find out at {return, Pid, {ok, LSN}} time.
%% Also, the append might fail, so the model can ignore those
%% failures because they're not mutating any state that and
%% external viewer can see.
%% WARNING: Trace0 + lamport_clocks means Trace0 is not strictly sorted!
Trace = add_LPN_to_append_calls(lists:sort(Trace0)),
Events = eqc_temporal:from_timed_list(Trace),
%% Example Events, temporal style, 1 usec resolution, same as original trace
%%
%% [{0,32014,[]},
%% {32014,32015,[{call,<0.467.0>,{append,<<"O">>,will_be,1}}]},
%% {32015,32421,[]},
%% {32421,32422,[{call,<0.466.0>,{append,<<134>>,will_be,2}}]},
%% {32422,44522,[]},
%% {44522,44523,[{result,<0.467.0>,{ok,...}}]},
%% {44523,47651,[]},
%% {47651,47652,[{result,<0.466.0>,{ok,...}}]},
%% {47652,infinity,[]}]
Calls = eqc_temporal:stateful(
fun({call, _Pid, _Call} = I) -> [I] end,
fun({call, Pid, _Call}, {result, Pid, _}) -> [] end,
Events),
%% Example Calls (temporal map of when a call is in progress)
%%
%% [{0,32014,[]},
%% {32014,32421,[{call,<0.467.0>,{append,<<"O">>,will_be,1}}]},
%% {32421,44522,
%% [{call,<0.466.0>,{append,<<134>>,will_be,2}},{call,<0.467.0>,{append,<<"O">>,will_be,1}}]},
%% {44522,47651,[{call,<0.466.0>,{append,<<134>>,will_be,2}}]},
%% {47651,infinity,[]}]
AllLPNsR = eqc_temporal:stateful(
fun({call, _Pid, {append, _Pg, will_be, LPN}}) -> LPN;
({call, _Pid, {append, _Pg, will_fail, {special_trimmed, LPN}}}) -> LPN;
({call, _Pid, {read, LPN, _, _}}) -> LPN;
({call, _Pid, {fill, LPN, will_be, ok}}) -> LPN;
({call, _Pid, {trim, LPN, will_be, ok}}) -> LPN;
({call, _Pid, {goo_write, LPN, _Pg}}) -> LPN
end,
fun(x) -> [] end,
Calls),
%%io:format("Calls ~p\n", [Calls]),
%%io:format("AllLPNsR ~p\n", [AllLPNsR]),
%% The last item in the relation tells us what the final facts are in the
%% relation. In this case, it's all LPNs ever mentioned in the test run.
{_, infinity, AllLPNs} = lists:last(eqc_temporal:all_future(AllLPNsR)),
%% Use the following atoms to denote transitions ("Ttn") by an LPN:
%% w_0 = not written yet, error_unwritten
%% w_1 = written successfully, {ok, binary::()}
%% w_ft = fill trimmed, error_trimmed
%% w_tt = trim trimmed, error_trimmed
Mods = eqc_temporal:stateful(
fun({call, Pid, {append, Pg, will_be, LPN}}) ->
{mod_working, w_1, LPN, Pg, Pid};
({call, Pid, {append, Pg, will_fail, {special_trimmed, LPN}}}) ->
%% This is a special case for the model. We know that
%% a write raced with a trim and lost (at least some of
%% the time inside the chain). But the transition that
%% we model in this case is a special w_ type that is
%% is trated specially by the dictionary-making
%% creation of the ValuesR relation.
{mod_working, w_special_trimmed, LPN, Pg, Pid};
({call, Pid, {fill, LPN, will_be, ok}}) ->
{mod_working, w_ft, LPN, fill, Pid};
({call, Pid, {trim, LPN, will_be, ok}}) ->
{mod_working, w_tt, LPN, trim, Pid};
({call, Pid, {read, LPN, will_fail, error_trimmed}}) ->
{mod_working, w_tt, LPN, read_repair_maybe, Pid}
end,
fun({mod_working, _Ttn, _LPN, _Pg, _Pid}, {result, _Pid, _Res})->
[]
end,
Events),
%% StartMod contains {mod_start, Ttn, LPN, V} when a modification finished.
%% DoneMod contains {mod_end, Ttn, LPN, V} when a modification finished.
%% This is a clever trick: Mods contains the start & end timestamp
%% for each modification. Use shift() by 1 usec to move all timestamps
%% forward/backward 1 usec, then subtract away the original time range to
%% leave a 1 usec relation in time.
StartMod = eqc_temporal:map(
fun({mod_working, Ttn, LPN, Pg, _Pid}) ->
{mod_start, Ttn, LPN, Pg}
end,
eqc_temporal:subtract(Mods, eqc_temporal:shift(1, Mods))),
DoneMod = eqc_temporal:map(
fun({mod_working, Ttn, LPN, Pg, _Pid}) ->
{mod_end, Ttn, LPN, Pg}
end,
eqc_temporal:subtract(eqc_temporal:shift(1, Mods), Mods)),
StartsDones = eqc_temporal:union(StartMod, DoneMod),
%% TODO: A brighter mind than mine might figure out how to do this
%% next step using only eqc_temporal.
%%
%% We create a new relation, ValuesR. This relation contains
%% {values, OD::orddict()} for each time interval in the relation.
%% The OD contains all possible values for a particular LPN at
%% that time in the relation.
%% The key for OD is LPN, the value is an unordered list of possible values.
InitialValDict = orddict:from_list([{LPN, [error_unwritten]} ||
LPN <- AllLPNs]),
ValuesRFun =
fun({TS1, TS2, StEnds}, Dict1) ->
Dict2 = lists:foldl(
fun({mod_start, w_1, LPN, Pg}, D) ->
orddict:append(LPN, Pg, D);
({mod_start, WType, LPN, _Pg}, D)
when WType == w_ft; WType == w_tt ->
case lists:member(error_trimmed,
orddict:fetch(LPN, D)) of
true ->
D;
false ->
orddict:append(LPN, error_trimmed,D)
end;
({mod_start, w_special_trimmed, LPN, Pg}, D)->
orddict:append(LPN, Pg, D)
end, Dict1, [X || X={mod_start,_,_,_} <- StEnds]),
Dict3 = lists:foldl(
fun({mod_end, w_1, LPN, Pg}, D) ->
Vs1 = orddict:fetch(LPN, D),
%% We've written a page. error_unwriten is
%% now impossible; any other binary() is
%% also impossible. However, there may be
%% a trim operation that's still in flight!
Vs2 = [V || V <- Vs1, V /= error_unwritten,
not is_binary(V)],
orddict:store(LPN, [Pg|Vs2], D);
({mod_end, WType, LPN, _Pg}, D)
when WType == w_ft; WType == w_tt ->
orddict:store(LPN, [error_trimmed], D);
({mod_end, w_special_trimmed, LPN, Pg}, D) ->
orddict:store(LPN, [Pg,error_trimmed], D)
end, Dict2, [X || X={mod_end,_,_,_} <- StEnds]),
{{TS1, TS2, [{values, Dict3}]}, Dict3}
end,
{ValuesR, _} = lists:mapfoldl(ValuesRFun, InitialValDict, StartsDones),
InitialTtnDict = orddict:from_list([{LPN, [w_0]} || LPN <- AllLPNs]),
{TransitionsR, _} =
lists:mapfoldl(
fun({TS1, TS2, StEnds}, Dict1) ->
Dict2 = lists:foldl(
fun({mod_end, Ttn, LPN, _Pg}, D) ->
%% orddict does not discard duplicates
orddict:append(LPN, Ttn, D);
(_, D) ->
D
end, Dict1, [X || X={mod_end,_,_,_} <- StEnds]),
{{TS1, TS2, [{transitions, Dict2}]}, Dict2}
end, InitialTtnDict, StartsDones),
%% Checking reads is a tricky thing. My first attempt created a temporal
%% relation for the 1usec window when the read call was complete, then
%% union with the ValuesR relation to see what values were valid at that
%% particular instant. That approach fails sometimes!
%%
%% The reason is honest race conditions with a mutation: the model doesn't
%% know exactly when the data was written, so a valid value may have been
%% added/removed from the ValuesR relation that aren't there for the
%% 1usec window that intersects with ValuesR.
%%
%% Instead, we need to merge together all possible values from ValuesR
%% that appear at any time during the read op's lifetime.
PerhapsR = eqc_temporal:stateful(
fun({call, _Pid, {goo_write, LPN, Pg}}) ->
{perhaps, LPN, Pg}
end,
fun(x)-> [] end,
Events),
{_, _, Perhaps} = lists:last(eqc_temporal:all_future(PerhapsR)),
%%?QC_FMT("*Perhaps: ~p\n", [Perhaps]),
Reads = eqc_temporal:stateful(
fun({call, Pid, {read, LPN, _, _}}) ->
{read, Pid, LPN, []}
end,
fun({read, Pid, LPN, V1s}, {values, Values}) ->
{ok, V2s} = orddict:find(LPN, Values),
NewVs = lists:umerge(lists:sort(V1s),
lists:sort(V2s)),
%% Throw an exception (which is equivalent to a no-op)
%% if there are no differences: if we make multiples
%% of the exact same thing, stateful() will get confused.
false = NewVs == V1s,
{read, Pid, LPN, NewVs};
({read, Pid, LPN, Vs}, {result, Pid, Pg}) ->
%% case lists:member(Pg, Vs) orelse
%% lists:member({perhaps, LPN, Pg}, Perhaps) of
case lists:member(Pg, Vs) of
true ->
[];
false ->
case lists:member({perhaps, LPN, Pg}, Perhaps) of
true ->
%% The checking of the Perhaps list in
%% this manner is not strictly
%% temporally valid. It is possible
%% for the {perhaps,...} event to be
%% after the event we're checking here.
%% TODO work is to make this check 100%
%% temporally valid.
io:format(user, "Yo, found ~p ~p in Perhaps\n", [LPN, Pg]),
[];
false ->
[{bad, read, LPN, Pid, got, Pg,
possible, Vs}]
end
end
end, eqc_temporal:union(Events, ValuesR)),
BadFilter = fun(bad) -> true;
(Bad) when is_tuple(Bad), element(1, Bad) == bad -> true;
(_) -> false end,
BadReads = filter_relation_facts(BadFilter, Reads),
%% Property: For all LPNs, the transition list for K must be one of the
%% following four (4) acceptable transition orderings.
{_, _, [{transitions, FinalTtns}]} = lists:last(
eqc_temporal:all_future(TransitionsR)),
FinaTtns_filtered = filter_transition_trimfill_suffixes(FinalTtns),
InvalidTransitions = orddict:fold(
fun(_LPN, [w_0], Acc) ->
Acc;
(_LPN, [w_0,w_1], Acc) ->
Acc;
(_LPN, [w_0,'w_t+'], Acc) ->
Acc;
(_LPN, [w_0,w_1,'w_t+'], Acc) ->
Acc;
(LPN, BadTtns, Acc) ->
[{LPN, BadTtns}|Acc]
end, [], FinaTtns_filtered),
?WHENFAIL(begin
?QC_FMT("*Trace: ~p\n", [Trace]),
?QC_FMT("*ModsReads: ~p\n", [eqc_temporal:unions([Mods,Reads])]),
?QC_FMT("*InvalidTtns: ~p\n", [InvalidTransitions]),
?QC_FMT("*ValuesR: ~p\n", [eqc_temporal:unions([ValuesR, StartsDones])]),
?QC_FMT("*Calls: ~p\n", [Calls]),
?QC_FMT("*BadReads: ~p\n", [BadReads]),
?QC_FMT("*Perhaps: ~p\n", [Perhaps])
end,
conjunction(
[
{all_calls_finish,
eqc_temporal:is_false(eqc_temporal:all_future(Calls))},
{no_invalidTransitions,
InvalidTransitions == []},
{no_bad_reads,
eqc_temporal:is_false(eqc_temporal:all_future(BadReads))},
%% If you want to see PULSE causing crazy scheduling, then
%% change one of the "true orelse" -> "false orelse" below.
%% {bogus_no_gaps,
%% true orelse
%% (AppendLPNs == [] orelse length(range_ify(AppendLPNs)) == 1)},
%% {bogus_exactly_1_to_N,
%% true orelse (AppendLPNs == lists:seq(1, length(AppendLPNs)))},
{true, true}
])).
add_LPN_to_append_calls([{TS, {call, Pid, {append, Page}}}|Rest]) ->
Res = trace_lookahead_pid(Pid, Rest),
New = case Res of
{ok, LPN} ->
{TS, {call, Pid, {append, Page, will_be, LPN}}};
Else ->
{TS, {call, Pid, {append, Page, will_fail, Else}}}
end,
[New|add_LPN_to_append_calls(Rest)];
add_LPN_to_append_calls([{TS, {call, Pid, {OpName, LPN}}}|Rest])
when OpName == fill; OpName == trim ->
Res = trace_lookahead_pid(Pid, Rest),
New = case Res of
ok ->
{TS, {call, Pid, {OpName, LPN, will_be, ok}}};
Else ->
{TS, {call, Pid, {OpName, LPN, will_fail, Else}}}
end,
[New|add_LPN_to_append_calls(Rest)];
add_LPN_to_append_calls([{TS, {call, Pid, {read, LPN}}}|Rest]) ->
Res = trace_lookahead_pid(Pid, Rest),
New = case Res of
Page when is_binary(Page) ->
{TS, {call, Pid, {read, LPN, will_be, Page}}};
Else ->
{TS, {call, Pid, {read, LPN, will_fail, Else}}}
end,
[New|add_LPN_to_append_calls(Rest)];
add_LPN_to_append_calls([X|Rest]) ->
[X|add_LPN_to_append_calls(Rest)];
add_LPN_to_append_calls([]) ->
[].
trace_lookahead_pid(Pid, [{_TS, {result, Pid, Res}}|_]) ->
Res;
trace_lookahead_pid(Pid, [_H|T]) ->
trace_lookahead_pid(Pid, T).
%% Presenting command data statistics in a nicer way
command_data({set, _, {call, _, Fun, _}}, {_S, _V}) ->
Fun.
%% Convenience functions for running tests
test() ->
test({20, sec}).
test(N) when is_integer(N) ->
quickcheck(numtests(N, prop_pulse()));
test({Time, sec}) ->
quickcheck(eqc:testing_time(Time, prop_pulse()));
test({Time, min}) ->
test({Time * 60, sec});
test({Time, h}) ->
test({Time * 60, min}).
check() ->
check(current_counterexample()).
verbose() ->
verbose(current_counterexample()).
verbose(CE) ->
erlang:put(verbose, true),
Ok = check(CE),
erlang:put(verbose, false),
Ok.
check(CE) ->
check(on_output(fun("OK" ++ _, []) -> ok; (Fmt, Args) -> io:format(Fmt, Args) end,
prop_pulse(true == erlang:get(verbose))),
CE).
recheck() ->
recheck(prop_pulse()).
zipwith(F, [X|Xs], [Y|Ys]) ->
[F(X, Y)|zipwith(F, Xs, Ys)];
zipwith(_, _, _) -> [].
delete_dir(Dir) ->
corfurl_util:delete_dir(Dir).
clean_up_runtime(#run{flus=Flus, proj=P}) ->
%% io:format(user, "clean_up_runtime: run = ~p\n", [R]),
#proj{seq={Seq,_,_}} = P,
catch corfurl_sequencer:stop(Seq),
[catch corfurl_flu:stop(F) || F <- Flus],
corfurl_test:setup_del_all(length(Flus)),
delete_dir(?PROJECTION_DIR),
(catch exit(whereis(?SEQUENCER_NAME), kill)).
make_chains(ChainLen, FLUs) ->
make_chains(ChainLen, FLUs, [], []).
make_chains(_ChainLen, [], SmallAcc, BigAcc) ->
[lists:reverse(SmallAcc)|BigAcc];
make_chains(ChainLen, [H|T], SmallAcc, BigAcc) ->
if length(SmallAcc) == ChainLen ->
make_chains(ChainLen, T, [H], [lists:reverse(SmallAcc)|BigAcc]);
true ->
make_chains(ChainLen, T, [H|SmallAcc], BigAcc)
end.
setup(NumChains, ChainLen, PageSize, SeqType) ->
(catch exit(whereis(?SEQUENCER_NAME), kill)),
lamport_clock:init(),
N = NumChains * ChainLen,
FLUs = corfurl_test:setup_basic_flus(N, PageSize, ?MAX_PAGES),
{ok, Seq} = corfurl_sequencer:start_link(FLUs, SeqType),
Chains = make_chains(ChainLen, FLUs),
%% io:format(user, "Cs = ~p\n", [Chains]),
Proj = corfurl:new_simple_projection(?PROJECTION_DIR,
1, 1, ?MAX_PAGES, Chains),
ok = corfurl:save_projection(?PROJECTION_DIR, Proj),
error_overwritten = corfurl:save_projection(?PROJECTION_DIR, Proj),
1 = corfurl:latest_projection_epoch_number(?PROJECTION_DIR),
{ok, Proj} = corfurl:read_projection(?PROJECTION_DIR, 1),
Run = #run{proj=Proj#proj{seq={Seq, node(), ?SEQUENCER_NAME}},
flus=FLUs},
ets:insert(?MY_TAB, {?MY_KEY, Run}),
Run.
range_ify([]) ->
[];
range_ify(L) ->
[H|T] = lists:sort(L),
range_ify(H, H+1, T).
range_ify(Beginning, Next, [Next|T]) ->
range_ify(Beginning, Next+1, T);
range_ify(Beginning, Next, [Else|T]) ->
[{Beginning, to, Next-1}|range_ify(Else, Else+1, T)];
range_ify(Beginning, Next, []) ->
[{Beginning, to, Next-1}].
filter_relation_facts(FilterFun, R) ->
[{TS1, TS2, lists:filter(FilterFun, Facts)} || {TS1, TS2, Facts} <- R].
%% {TS1, TS2, Facts} <- Reads, Fact <- Facts, BadFilter(Fact)],
filter_transition_trimfill_suffixes(Ttns) ->
[{X, filter_1_transition_list(L)} || {X, L} <- Ttns].
filter_1_transition_list([]) ->
[];
filter_1_transition_list(Old) ->
%% Strategy: Chop off all of the w_* at the end, then look at **Old** to
%% see if we chopped off any. If we did chop off any, then add back a
%% constant 'w_t+' as a suffix.
New = lists:reverse(lists:dropwhile(fun(w_tt) -> true;
(w_ft) -> true;
(w_special_trimmed) -> true;
(_) -> false
end, lists:reverse(Old))),
Suffix = case lists:last(Old) of
w_ft -> ['w_t+'];
w_tt -> ['w_t+'];
w_special_trimmed -> ['w_t+'];
_ -> []
end,
New ++ Suffix.
log_make_call(Tag) ->
log_make_call(self(), Tag).
log_make_call(Pid, Tag) ->
{call, Pid, Tag}.
log_make_result(Result) ->
log_make_result(self(), Result).
log_make_result(Pid, Result) ->
{result, Pid, Result}.
pick_an_LPN(#proj{seq={Seq,_,_}} = P, SeedInt) ->
case (catch corfurl_sequencer:get(Seq, 0)) of
{ok, Max} ->
%% The sequencer may be lying to us, shouganai.
if SeedInt > Max -> (SeedInt rem Max) + 1;
true -> SeedInt
end;
_Else ->
pick_an_LPN(corfurl_client:restart_sequencer(P), SeedInt)
end.
-define(LOG3(Tag, MkCall, PostCall),
begin
LOG__Start = lamport_clock:get(),
event_logger:event(log_make_call(Tag), LOG__Start),
LOG__Result = MkCall,
LOG__End = lamport_clock:get(),
PostCall,
event_logger:event(log_make_result(LOG__Result), LOG__End),
LOG__Result
end).
-define(LOG(Tag, MkCall), ?LOG3(Tag, MkCall, okqq)).
append(#run{proj=OriginalProj}, Page) ->
lamport_clock:init(),
lamport_clock:incr(),
Proj = get_projection(OriginalProj),
?LOG3({append, Page},
try
corfurl_client:pulse_tracing_start(write),
{Res, Proj2} = corfurl_client:append_page(Proj, Page),
put_projection(Proj2),
OtherPages0 = lists:usort(corfurl_client:pulse_tracing_get(write)),
OtherPages = case Res of
{ok, LPN} ->
OtherPages0 -- [LPN];
_ ->
OtherPages0
end,
put(zzzOtherPages, OtherPages),
perhaps_trip_append_page(?TRIP_no_append_duplicates, Res, Page)
catch X:Y ->
{caught, ?MODULE, ?LINE, X, Y, erlang:get_stacktrace()}
end,
try
OPages = get(zzzOtherPages),
%%if OPages /= [] -> io:format("OPages = ~w\n", [OPages]); true -> ok end,
GooPid = {self(), goo, now()},
[begin
event_logger:event(log_make_call(GooPid, {goo_write, OP, Page}),
LOG__Start),
event_logger:event(log_make_result(GooPid, who_knows),
LOG__End)
end || OP <- OPages]
catch XX:YY ->
exit({oops, ?MODULE, ?LINE, XX, YY, erlang:get_stacktrace()})
end).
read_result_mangle({ok, Page}) ->
Page;
read_result_mangle(Else) ->
Else.
read_approx(#run{proj=OriginalProj}, SeedInt) ->
lamport_clock:init(),
lamport_clock:incr(),
Proj = get_projection(OriginalProj),
LPN = pick_an_LPN(Proj, SeedInt),
?LOG({read, LPN},
try
{Res, Proj2} = corfurl_client:read_page(Proj, LPN),
put_projection(Proj2),
Res2 = read_result_mangle(Res),
perhaps_trip_read_approx(?TRIP_bad_read, Res2, LPN)
catch X:Y ->
{caught, ?MODULE, ?LINE, X, Y, erlang:get_stacktrace()}
end).
scan_forward(#run{proj=OriginalProj}, SeedInt, NumPages) ->
lamport_clock:init(),
lamport_clock:incr(),
Proj = get_projection(OriginalProj),
StartLPN = if SeedInt == 1 -> 1;
true -> pick_an_LPN(Proj, SeedInt)
end,
%% Our job is complicated by the ?LOG() macro, which isn't good enough
%% for our purpose: we must lie about the starting timestamp, to make
%% it appear as if each LPN result that scan_forward() gives us came
%% instead from a single-page read_page() call.
?LOG({scan_forward, StartLPN, NumPages},
try
TS1 = lamport_clock:get(),
case corfurl_client:scan_forward(Proj, StartLPN, NumPages) of
{{Res, EndLPN, MoreP, Pages}, Proj2}
when Res == ok; Res == error_badepoch ->
put_projection(Proj2),
PageIs = lists:zip(Pages, lists:seq(1, length(Pages))),
TS2 = lamport_clock:get(),
[begin
PidI = {self(), s_f, I},
event_logger:event(log_make_call(PidI, {read, LPN}),
TS1),
Pm = perhaps_trip_scan_forward(
?TRIP_bad_scan_forward, read_result_mangle(P),
EndLPN),
event_logger:event(log_make_result(PidI, Pm), TS2)
end || {{LPN, P}, I} <- PageIs],
Ps = [{LPN, read_result_mangle(P)} ||
{LPN, P} <- Pages],
{ok, EndLPN, MoreP, Ps}
end
catch X:Y ->
{caught, ?MODULE, ?LINE, X, Y, erlang:get_stacktrace()}
end).
fill(#run{proj=OriginalProj}, SeedInt) ->
lamport_clock:init(),
lamport_clock:incr(),
Proj = get_projection(OriginalProj),
LPN = pick_an_LPN(Proj, SeedInt),
?LOG({fill, LPN},
try
{Res, Proj2} = corfurl_client:fill_page(Proj, LPN),
put_projection(Proj2),
perhaps_trip_fill_page(?TRIP_bad_fill, Res, LPN)
catch X:Y ->
{caught, ?MODULE, ?LINE, X, Y, erlang:get_stacktrace()}
end).
trim(#run{proj=OriginalProj}, SeedInt) ->
lamport_clock:init(),
lamport_clock:incr(),
Proj = get_projection(OriginalProj),
LPN = pick_an_LPN(Proj, SeedInt),
?LOG({trim, LPN},
try
{Res, Proj2} = corfurl_client:trim_page(Proj, LPN),
put_projection(Proj2),
perhaps_trip_trim_page(?TRIP_bad_trim, Res, LPN)
catch X:Y ->
{caught, ?MODULE, ?LINE, X, Y, erlang:get_stacktrace()}
end).
stop_sequencer(#run{proj=OriginalProj}, Method) ->
Proj = get_projection(OriginalProj),
Seq = element(1,Proj#proj.seq),
try
corfurl_sequencer:stop(Seq, Method),
ok
catch _:_ ->
ok
end.
get_projection(OriginalProj) ->
case get(projection) of
undefined ->
OriginalProj;
Proj ->
Proj
end.
put_projection(Proj) ->
put(projection, Proj).
perhaps_trip_append_page(false, Res, _Page) ->
Res;
perhaps_trip_append_page(true, {ok, LPN}, _Page) when LPN > 3 ->
io:format(user, "TRIP: append_page\n", []),
{ok, 3};
perhaps_trip_append_page(true, Else, _Page) ->
Else.
perhaps_trip_read_approx(false, Res, _LPN) ->
Res;
perhaps_trip_read_approx(true, _Res, 3 = LPN) ->
io:format(user, "TRIP: read_approx LPN ~p\n", [LPN]),
<<"FAKE!">>;
perhaps_trip_read_approx(true, Res, _LPN) ->
Res.
perhaps_trip_scan_forward(false, Res, _EndLPN) ->
Res;
perhaps_trip_scan_forward(true, _Res, 10) ->
io:format(user, "TRIP: scan_forward\n", []),
<<"magic number bingo, you are a winner">>;
perhaps_trip_scan_forward(true, Res, _EndLPN) ->
Res.
perhaps_trip_fill_page(false, Res, _EndLPN) ->
Res;
perhaps_trip_fill_page(true, _Res, LPN) when 3 =< LPN, LPN =< 5 ->
io:format(user, "TRIP: fill_page\n", []),
ok; % can trigger both invalid ttn and bad read
perhaps_trip_fill_page(true, Res, _EndLPN) ->
Res.
perhaps_trip_trim_page(false, Res, _EndLPN) ->
Res;
perhaps_trip_trim_page(true, _Res, LPN) when 3 =< LPN, LPN =< 5 ->
io:format(user, "TRIP: trim_page\n", []),
ok;
perhaps_trip_trim_page(true, Res, _EndLPN) ->
Res.
-endif. % PULSE
-endif. % TEST

View file

@ -0,0 +1,80 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(corfurl_sequencer_test).
-compile(export_all).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-ifdef(PULSE).
-compile({parse_transform, pulse_instrument}).
-endif.
-endif.
-define(M, corfurl_sequencer).
-ifdef(TEST).
-ifndef(PULSE).
smoke_test() ->
BaseDir = "/tmp/" ++ atom_to_list(?MODULE) ++ ".",
PageSize = 8,
NumPages = 500,
NumFLUs = 4,
MyDir = fun(X) -> BaseDir ++ integer_to_list(X) end,
Del = fun() -> [ok = corfurl_util:delete_dir(MyDir(X)) ||
X <- lists:seq(1, NumFLUs)] end,
Del(),
FLUs = [begin
element(2, corfurl_flu:start_link(MyDir(X),
PageSize, NumPages*PageSize))
end || X <- lists:seq(1, NumFLUs)],
FLUsNums = lists:zip(FLUs, lists:seq(1, NumFLUs)),
try
[ok = corfurl_flu:write(FLU, 1, PageNum, <<42:(8*8)>>) ||
{FLU, PageNum} <- FLUsNums],
MLP0 = NumFLUs,
NumFLUs = ?M:get_max_logical_page(FLUs),
%% Excellent. Now let's start the sequencer and see if it gets
%% the same answer. If yes, then the first get will return MLP1,
%% yadda yadda.
MLP1 = MLP0 + 1,
MLP3 = MLP0 + 3,
MLP4 = MLP0 + 4,
{ok, Sequencer} = ?M:start_link(FLUs),
try
{ok, MLP1} = ?M:get(Sequencer, 2),
{ok, MLP3} = ?M:get(Sequencer, 1),
{ok, MLP4} = ?M:get(Sequencer, 1)
after
?M:stop(Sequencer)
end
after
[ok = corfurl_flu:stop(FLU) || FLU <- FLUs],
Del()
end.
-endif. % not PULSE
-endif. % TEST

View file

@ -0,0 +1,262 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(corfurl_test).
-include("corfurl.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-define(M, corfurl).
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
setup_flu_basedir() ->
"./tmp." ++
atom_to_list(?MODULE) ++ "." ++ os:getpid() ++ ".".
setup_flu_dir(N) ->
setup_flu_basedir() ++ integer_to_list(N).
setup_del_all(NumFLUs) ->
[ok = corfurl_util:delete_dir(setup_flu_dir(N)) ||
N <- lists:seq(1, NumFLUs)].
setup_basic_flus(NumFLUs, PageSize, NumPages) ->
setup_del_all(NumFLUs),
[begin
element(2, corfurl_flu:start_link(setup_flu_dir(X),
PageSize, NumPages * (PageSize * ?PAGE_OVERHEAD)))
end || X <- lists:seq(1, NumFLUs)].
-ifndef(PULSE).
save_read_test() ->
Dir = "/tmp/" ++ atom_to_list(?MODULE) ++".save-read",
PDir = Dir ++ ".projection",
Chain = [a,b],
P1 = ?M:new_simple_projection(PDir, 1, 1, 1*100, [Chain]),
try
filelib:ensure_dir(Dir ++ "/ignored"),
ok = ?M:save_projection(Dir, P1),
error_overwritten = ?M:save_projection(Dir, P1),
{ok, P1} = ?M:read_projection(Dir, 1),
error_unwritten = ?M:read_projection(Dir, 2),
ok
after
ok = corfurl_util:delete_dir(Dir),
ok = corfurl_util:delete_dir(PDir)
end.
smoke1_test() ->
PDir = "./tmp.smoke1.projection",
NumFLUs = 6,
PageSize = 8,
NumPages = 10,
FLUs = [F1, F2, F3, F4, F5, F6] =
setup_basic_flus(NumFLUs, PageSize, NumPages),
{ok, Seq} = corfurl_sequencer:start_link(FLUs),
%% We know that the first LPN will be 1.
LPN_Pgs = [{X, list_to_binary(
lists:flatten(io_lib:format("~8..0w", [X])))} ||
X <- lists:seq(1, 5)],
try
P0 = ?M:new_simple_projection(PDir, 1, 1, 1*100,
[[F1, F2, F3], [F4, F5, F6]]),
P1 = P0#proj{seq={Seq, unused, unused}},
[begin {{ok, LPN}, _} = corfurl_client:append_page(P1, Pg) end || {LPN, Pg} <- LPN_Pgs],
[begin {ok, Pg} = ?M:read_page(P1, LPN) end || {LPN, Pg} <- LPN_Pgs],
[begin
LPNplus = LPN + 1,
{ok, LPNplus, true, [{LPN, Pg}]} = ?M:scan_forward(P1, LPN, 1)
end || {LPN, Pg} <- LPN_Pgs],
{ok, 6, false, []} = ?M:scan_forward(P1, 6, 1),
{ok, 6, false, []} = ?M:scan_forward(P1, 6, 10),
[{LPN1,Pg1}, {LPN2,Pg2}, {LPN3,Pg3}, {LPN4,Pg4}, {LPN5,Pg5}] = LPN_Pgs,
{ok, 4, true, [{LPN2,Pg2}, {LPN3,Pg3}]} = ?M:scan_forward(P1, 2, 2),
{ok, 6, false, [{LPN3,Pg3}, {LPN4,Pg4}, {LPN5,Pg5}]} =
?M:scan_forward(P1, 3, 10),
%% Let's smoke read-repair: regular write failure
Epoch = P1#proj.epoch,
Pg6 = <<424242:(PageSize*8)>>,
%% Simulate a failed write to the chain.
[F6a, F6b, F6c] = Chain6 = ?M:project_to_chain(6, P1),
NotHead6 = [F6b, F6c],
ok = ?M:write_page_to_chain([F6a], [F6a], Epoch, 6, Pg6, 1),
%% Does the chain look as expected?
{ok, Pg6} = corfurl_flu:read(?M:flu_pid(F6a), Epoch, 6),
[error_unwritten = corfurl_flu:read(?M:flu_pid(X), Epoch, 6) ||
X <- NotHead6],
%% Read repair should fix it.
{ok, Pg6} = ?M:read_page(P1, 6),
[{ok, Pg6} = corfurl_flu:read(?M:flu_pid(X), Epoch, 6) || X <- Chain6],
%% Let's smoke read-repair: failed fill
[F7a, F7b, F7c] = Chain7 = ?M:project_to_chain(7, P1),
NotHead7 = [F7b, F7c],
ok = corfurl_flu:fill(?M:flu_pid(F7a), Epoch, 7),
%% Does the chain look as expected?
error_trimmed = corfurl_flu:read(?M:flu_pid(F7a), Epoch, 7),
[error_unwritten = corfurl_flu:read(?M:flu_pid(X), Epoch, 7) ||
X <- NotHead7],
%% Read repair should fix it.
error_trimmed = ?M:read_page(P1, 7),
[error_trimmed = corfurl_flu:read(?M:flu_pid(X), Epoch, 7) || X <- Chain7],
%% scan_forward shouldn't see it either
{ok, 8, false, [{6,Pg6}]} = ?M:scan_forward(P1, 6, 10),
[F8a|_] = Chain8 = ?M:project_to_chain(8, P1),
ok = corfurl_flu:fill(?M:flu_pid(F8a), Epoch, 8),
%% No read before scan, scan_forward shouldn't see 8 either,
%% but the next seq should be 9
{ok, 9, false, [{6,Pg6}]} = ?M:scan_forward(P1, 6, 10),
ok
after
corfurl_util:delete_dir(PDir),
corfurl_sequencer:stop(Seq),
[corfurl_flu:stop(F) || F <- FLUs],
setup_del_all(NumFLUs)
end.
smoke_append_badepoch_test() ->
PDir = "./tmp.smoke2.projection",
NumFLUs = 6,
PageSize = 8,
NumPages = 10,
FLUs = [F1, F2, F3, F4, F5, F6] =
setup_basic_flus(NumFLUs, PageSize, NumPages),
{ok, Seq} = corfurl_sequencer:start_link(FLUs),
%% We know that the first LPN will be 1.
LPN_Pgs = [{X, list_to_binary(
lists:flatten(io_lib:format("~8..0w", [X])))} ||
X <- lists:seq(1, 5)],
try
LittleEpoch = 4,
BigEpoch = 42,
P0 = ?M:new_simple_projection(PDir, BigEpoch, 1, 1*100,
[[F1, F2, F3], [F4, F5, F6]]),
P1 = P0#proj{seq={Seq, unused, unused}},
[begin {{ok, LPN}, _} = corfurl_client:append_page(P1, Pg) end || {LPN, Pg} <- LPN_Pgs],
[{ok, _} = corfurl_flu:seal(FLU, BigEpoch) || FLU <- FLUs],
{_LPN, Pg} = hd(LPN_Pgs),
{error_badepoch, _} = corfurl_client:append_page(P1, Pg),
P2 = P1#proj{epoch=LittleEpoch},
{error_badepoch, _} = corfurl_client:append_page(P2, Pg),
ok
after
corfurl_util:delete_dir(PDir),
corfurl_sequencer:stop(Seq),
[corfurl_flu:stop(F) || F <- FLUs],
setup_del_all(NumFLUs)
end.
-ifdef(TIMING_TEST).
forfun_test_() ->
{timeout, 99999, fun() ->
[forfun(Procs) || Procs <- [10,100,1000,5000]]
end}.
forfun_append(0, _P, _Page) ->
ok;
forfun_append(N, #proj{seq={Seq, _, _}} = P, Page) ->
{ok, _} = ?M:append_page(Seq, P, Page),
forfun_append(N - 1, P, Page).
%%% My MBP, SSD
%%% The 1K and 5K procs shows full-mailbox-scan ickiness
%%% when getting replies from prim_file. :-(
%%% forfun: 10 procs writing 200000 pages of 8 bytes/page to 2 chains of 4 total FLUs in 10.016815 sec
%%% forfun: 100 procs writing 200000 pages of 8 bytes/page to 2 chains of 4 total FLUs in 10.547976 sec
%%% forfun: 1000 procs writing 200000 pages of 8 bytes/page to 2 chains of 4 total FLUs in 13.706686 sec
%%% forfun: 5000 procs writing 200000 pages of 8 bytes/page to 2 chains of 4 total FLUs in 33.516312 sec
%%% forfun: 10 procs writing 200000 pages of 8 bytes/page to 4 chains of 4 total FLUs in 5.350147 sec
%%% forfun: 100 procs writing 200000 pages of 8 bytes/page to 4 chains of 4 total FLUs in 5.429485 sec
%%% forfun: 1000 procs writing 200000 pages of 8 bytes/page to 4 chains of 4 total FLUs in 5.643233 sec
%%% forfun: 5000 procs writing 200000 pages of 8 bytes/page to 4 chains of 4 total FLUs in 15.686058 sec
%%%% forfun: 10 procs writing 200000 pages of 4096 bytes/page to 2 chains of 4 total FLUs in 13.479458 sec
%%%% forfun: 100 procs writing 200000 pages of 4096 bytes/page to 2 chains of 4 total FLUs in 14.752565 sec
%%%% forfun: 1000 procs writing 200000 pages of 4096 bytes/page to 2 chains of 4 total FLUs in 25.012306 sec
%%%% forfun: 5000 procs writing 200000 pages of 4096 bytes/page to 2 chains of 4 total FLUs in 38.972076 sec
forfun(NumProcs) ->
PDir = "./tmp.forfun.projection",
io:format(user, "\n", []),
NumFLUs = 4,
PageSize = 8,
%%PageSize = 4096,
NumPages = 200*1000,
PagesPerProc = NumPages div NumProcs,
FLUs = [F1, F2, F3, F4] = setup_basic_flus(NumFLUs, PageSize, NumPages),
{ok, Seq} = corfurl_sequencer:start_link(FLUs),
try
Chains = [[F1, F2], [F3, F4]],
%%Chains = [[F1], [F2], [F3], [F4]],
P0 = ?M:new_simple_projection(PDir, 1, 1, NumPages*2, Chains),
P = P0#proj{seq={Seq, unused, unused}},
Me = self(),
Start = now(),
Ws = [begin
Page = <<X:(PageSize*8)>>,
spawn_link(fun() ->
forfun_append(PagesPerProc, P, Page),
Me ! {done, self()}
end)
end || X <- lists:seq(1, NumProcs)],
[receive {done, W} -> ok end || W <- Ws],
End = now(),
io:format(user, "forfun: ~p procs writing ~p pages of ~p bytes/page to ~p chains of ~p total FLUs in ~p sec\n",
[NumProcs, NumPages, PageSize, length(Chains), length(lists:flatten(Chains)), timer:now_diff(End, Start) / 1000000]),
ok
after
corfur_util:delete_dir(PDir),
corfurl_sequencer:stop(Seq),
[corfurl_flu:stop(F) || F <- FLUs],
setup_del_all(NumFLUs)
end.
-endif. % TIMING_TEST
-endif. % not PULSE
-endif. % TEST

View file

@ -0,0 +1,133 @@
%%% File : handle_errors.erl
%%% Author : Ulf Norell
%%% Description :
%%% Created : 26 Mar 2012 by Ulf Norell
-module(event_logger).
-compile(export_all).
-behaviour(gen_server).
%% API
-export([start_link/0, event/1, event/2, get_events/0, start_logging/0]).
-export([timestamp/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, { start_time, events = [] }).
-record(event, { timestamp, data }).
%%====================================================================
%% API
%%====================================================================
%%--------------------------------------------------------------------
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
%% Description: Starts the server
%%--------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
start_logging() ->
gen_server:call(?MODULE, {start, timestamp()}).
event(EventData) ->
event(EventData, timestamp()).
event(EventData, Timestamp) ->
gen_server:call(?MODULE,
#event{ timestamp = Timestamp, data = EventData }).
async_event(EventData) ->
gen_server:cast(?MODULE,
#event{ timestamp = timestamp(), data = EventData }).
get_events() ->
gen_server:call(?MODULE, get_events).
%%====================================================================
%% gen_server callbacks
%%====================================================================
%%--------------------------------------------------------------------
%% Function: init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% Description: Initiates the server
%%--------------------------------------------------------------------
init([]) ->
{ok, #state{}}.
%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) ->
%% {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
%% Description: Handling call messages
%%--------------------------------------------------------------------
handle_call(Event = #event{}, _From, State) ->
{reply, ok, add_event(Event, State)};
handle_call({start, Now}, _From, S) ->
{reply, ok, S#state{ events = [], start_time = Now }};
handle_call(get_events, _From, S) ->
{reply, lists:reverse([ {E#event.timestamp, E#event.data} || E <- S#state.events]),
S#state{ events = [] }};
handle_call(Request, _From, State) ->
{reply, {error, {bad_call, Request}}, State}.
%%--------------------------------------------------------------------
%% Function: handle_cast(Msg, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling cast messages
%%--------------------------------------------------------------------
handle_cast(Event = #event{}, State) ->
{noreply, add_event(Event, State)};
handle_cast(_Msg, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling all non call/cast messages
%%--------------------------------------------------------------------
handle_info(_Info, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: terminate(Reason, State) -> void()
%% Description: This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any necessary
%% cleaning up. When it returns, the gen_server terminates with Reason.
%% The return value is ignored.
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
%% Description: Convert process state when code is changed
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
add_event(#event{timestamp = Now, data = Data}, State) ->
Event = #event{ timestamp = Now, data = Data },
State#state{ events = [Event|State#state.events] }.
timestamp() ->
lamport_clock:get().

View file

@ -0,0 +1,153 @@
%%%-------------------------------------------------------------------
%%% @author Hans Svensson <>
%%% @copyright (C) 2012, Hans Svensson
%%% @doc
%%%
%%% @end
%%% Created : 19 Mar 2012 by Hans Svensson <>
%%%-------------------------------------------------------------------
-module(handle_errors).
-behaviour(gen_event).
%% API
-export([start_link/0, add_handler/0]).
%% gen_event callbacks
-export([init/1, handle_event/2, handle_call/2,
handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, { errors = [] }).
%%%===================================================================
%%% gen_event callbacks
%%%===================================================================
%%--------------------------------------------------------------------
%% @doc
%% Creates an event manager
%%
%% @spec start_link() -> {ok, Pid} | {error, Error}
%% @end
%%--------------------------------------------------------------------
start_link() ->
gen_event:start_link({local, ?SERVER}).
%%--------------------------------------------------------------------
%% @doc
%% Adds an event handler
%%
%% @spec add_handler() -> ok | {'EXIT', Reason} | term()
%% @end
%%--------------------------------------------------------------------
add_handler() ->
gen_event:add_handler(?SERVER, ?MODULE, []).
%%%===================================================================
%%% gen_event callbacks
%%%===================================================================
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever a new event handler is added to an event manager,
%% this function is called to initialize the event handler.
%%
%% @spec init(Args) -> {ok, State}
%% @end
%%--------------------------------------------------------------------
init([]) ->
{ok, #state{}}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever an event manager receives an event sent using
%% gen_event:notify/2 or gen_event:sync_notify/2, this function is
%% called for each installed event handler to handle the event.
%%
%% @spec handle_event(Event, State) ->
%% {ok, State} |
%% {swap_handler, Args1, State1, Mod2, Args2} |
%% remove_handler
%% @end
%%--------------------------------------------------------------------
handle_event({error, _, {_, "Hintfile '~s' has bad CRC" ++ _, _}}, State) ->
{ok, State};
handle_event({error, _, {_, "** Generic server" ++ _, _}}, State) ->
{ok, State};
handle_event({error, _, {_, "Failed to merge ~p: ~p\n", [_, not_ready]}}, State) ->
{ok, State};
handle_event({error, _, {_, "Failed to merge ~p: ~p\n", [_, {merge_locked, _, _}]}}, State) ->
{ok, State};
handle_event({error, _, {_, "Failed to read lock data from ~s: ~p\n", [_, {invalid_data, <<>>}]}}, State) ->
{ok, State};
handle_event({error, _, Event}, State) ->
{ok, State#state{ errors = [Event|State#state.errors] }};
handle_event(_Event, State) ->
{ok, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever an event manager receives a request sent using
%% gen_event:call/3,4, this function is called for the specified
%% event handler to handle the request.
%%
%% @spec handle_call(Request, State) ->
%% {ok, Reply, State} |
%% {swap_handler, Reply, Args1, State1, Mod2, Args2} |
%% {remove_handler, Reply}
%% @end
%%--------------------------------------------------------------------
handle_call(get_errors, S) ->
{ok, S#state.errors, S#state{ errors = [] }};
handle_call(_Request, State) ->
Reply = ok,
{ok, Reply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called for each installed event handler when
%% an event manager receives any other message than an event or a
%% synchronous request (or a system message).
%%
%% @spec handle_info(Info, State) ->
%% {ok, State} |
%% {swap_handler, Args1, State1, Mod2, Args2} |
%% remove_handler
%% @end
%%--------------------------------------------------------------------
handle_info(_Info, State) ->
{ok, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever an event handler is deleted from an event manager, this
%% function is called. It should be the opposite of Module:init/1 and
%% do any necessary cleaning up.
%%
%% @spec terminate(Reason, State) -> void()
%% @end
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Convert process state when code is changed
%%
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
%% @end
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================

View file

@ -0,0 +1,67 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(lamport_clock).
-export([init/0, get/0, update/1, incr/0]).
-define(KEY, ?MODULE).
-ifdef(TEST).
init() ->
case get(?KEY) of
undefined ->
%% {Ca, Cb, _} = now(),
%% FakeTOD = ((Ca * 1000000) + Cb) * 1000000,
FakeTOD = 0,
put(?KEY, FakeTOD + 1);
N when is_integer(N) ->
ok
end.
get() ->
get(?KEY).
update(Remote) ->
New = erlang:max(get(?KEY), Remote) + 1,
put(?KEY, New),
New.
incr() ->
New = get(?KEY) + 1,
put(?KEY, New),
New.
-else. % TEST
init() ->
ok.
get() ->
ok.
update(_) ->
ok.
incr() ->
ok.
-endif. % TEST