Merge branch 'slf/tango-cleanup1'

This commit is contained in:
Scott Lystig Fritchie 2015-03-03 16:31:13 +09:00
commit e1fcbd8bb0
23 changed files with 106 additions and 2320 deletions

View file

@ -41,5 +41,7 @@ TODO
## The tango prototype
TODO
A quick & dirty prototype of Tango on top of the `prototype/corfurl`
CORFU implementation. The implementation is powerful enough (barely)
to run concurrently on multiple Erlang nodes. See its `README.md`
file for limitations, TODO items, etc.

View file

@ -14,8 +14,7 @@ deps:
$(REBAR_BIN) get-deps
clean:
$(REBAR_BIN) clean
-for dir in deps/*; do (cd $$dir ; $(REBAR_BIN) clean); done
$(REBAR_BIN) -r clean
test: deps compile eunit

View file

@ -14,7 +14,7 @@ deps:
$(REBAR_BIN) get-deps
clean:
$(REBAR_BIN) clean
$(REBAR_BIN) -r clean
test: deps compile eunit
@ -25,9 +25,8 @@ pulse: compile
env USE_PULSE=1 $(REBAR_BIN) skip_deps=true clean compile
env USE_PULSE=1 $(REBAR_BIN) skip_deps=true -D PULSE eunit
APPS = kernel stdlib sasl erts ssl tools os_mon runtime_tools crypto inets \
xmerl webtool snmp public_key mnesia eunit syntax_tools compiler
PLT = $(HOME)/.dbms_dialyzer_plt
APPS = kernel stdlib sasl erts ssl compiler eunit
PLT = $(HOME)/.tango_dialyzer_plt
build_plt: deps compile
dialyzer --build_plt --output_plt $(PLT) --apps $(APPS) deps/*/ebin

View file

@ -1,6 +1,76 @@
Tango prototype TODO list
=========================
# Tango prototype
This is a quick hack, just to see how quick & easy it might be to
build Tango on top of corfurl. It turned out to be pretty quick and
easy.
This prototype does not include any datatype-specific APIs, such as an
HTTP REST interface for manipulating a queue. The current API is
native Erlang only. However, because the Tango client communicates to
the underlying CORFU log via the `corfurl` interface, this
implementation is powerful enough to run concurrently on multiple
Erlang nodes.
This implementation does not follow the same structure as described in
the Tango paper. I made some changes, based on some guesses/partial
understanding of the paper. If I were to start over again, I'd try to
use the exact same naming scheme & structure suggested by the paper.
## Testing environment
Tested using Erlang/OTP R16B and Erlang/OTP 17, both on OS X.
It ought to "just work" on other versions of Erlang and on other OS
platforms, but sorry, I haven't tested it.
Use `make` and `make test` to compile and run unit tests.
Note that the Makefile assumes that the rebar utility is available
somewhere in your path.
## Data types implemented
* OID mapper
* Simple single-value register
* Map (i.e., multi-value register or basic key-value store)
* Queue
* Used the Erlang/OTP `queue.erl` library for rough inspiration
* Operations: is_empty, length, peek, to_list, member, in, out,
reverse, filter.
* Queue mutation operations are not idempotent with respect to
multiple writes in the underlying CORFU log, e.g., due to CORFU
log reconfiguration or partial write error/timeouts.
## Experimental idea: built-in OID checkpointing
I was toying with the idea of adding a Tango "history splicing"
operation that could make the implementation per-OID checkpoint &
garbage collection (and CORFU-level trimming) operations much easier.
I think that this might be a very good idea and that it deserves more
research & work.
The implementation of the checkpointing & splicing as it is today is
flawed. See the TODO list below for more details.
## Information about the Tango paper
"Tango: Distributed Data Structures over a Shared Log"
Balakrishnan, Malkhi, Wobber, Wu, Brabhakaran, Wei, Davis, Rao, Zou, Zuck
Describes a framework for developing data structures that reside
persistently within a CORFU log: the log *is* the database/data
structure store.
http://www.snookles.com/scottmp/corfu/Tango.pdf
See also, `../corfu/docs/corfurl.md` for more information on CORFU
research papers.
## TODO list
__ The src/corfu* files in this sub-repo differ from the original
prototype source files in the ../corfu sub-repo, sorry!
__ The current checkpoint implementation is fundamentally broken and
needs a rewrite, or else.
@ -23,11 +93,12 @@ new OID has problems with updates written to the old OID before and before the
new checkpoint has finished.
I believe that a checkpoint where:
* all Tango writes, checkpoint and non-checkpoint alike, are noted with
a checkpoint number.
* that checkpoint number is strictly increasing
* a new checkpoint has a new checkpoint number
* scans ignore blocks with checkpoint numbers larger than the current
active checkpoint #, until the checkpoint is complete.
* all Tango writes, checkpoint and non-checkpoint alike, are noted with
a checkpoint number.
* that checkpoint number is strictly increasing
* a new checkpoint has a new checkpoint number
* scans ignore blocks with checkpoint numbers larger than the current
active checkpoint #, until the checkpoint is complete.
... ought to work correctly.

View file

@ -1,191 +0,0 @@
## CORFU papers
I recommend the "5 pages" paper below first, to give a flavor of
what the CORFU is about. When Scott first read the CORFU paper
back in 2011 (and the Hyder paper), he thought it was insanity.
He recommends waiting before judging quite so hastily. :-)
After that, then perhaps take a step back are skim over the
Hyder paper. Hyder started before CORFU, but since CORFU, the
Hyder folks at Microsoft have rewritten Hyder to use CORFU as
the shared log underneath it. But the Hyder paper has lots of
interesting bits about how you'd go about creating a distributed
DB where the transaction log *is* the DB.
### "CORFU: A Distributed Shared LogCORFU: A Distributed Shared Log"
MAHESH BALAKRISHNAN, DAHLIA MALKHI, JOHN D. DAVIS, and VIJAYAN
PRABHAKARAN, Microsoft Research Silicon Valley, MICHAEL WEI,
University of California, San Diego, TED WOBBER, Microsoft Research
Silicon Valley
Long version of introduction to CORFU (~30 pages)
http://www.snookles.com/scottmp/corfu/corfu.a10-balakrishnan.pdf
### "CORFU: A Shared Log Design for Flash Clusters"
Same authors as above
Short version of introduction to CORFU paper above (~12 pages)
http://www.snookles.com/scottmp/corfu/corfu-shared-log-design.nsdi12-final30.pdf
### "From Paxos to CORFU: A Flash-Speed Shared Log"
Same authors as above
5 pages, a short summary of CORFU basics and some trial applications
that have been implemented on top of it.
http://www.snookles.com/scottmp/corfu/paxos-to-corfu.malki-acmstyle.pdf
### "Beyond Block I/O: Implementing a Distributed Shared Log in Hardware"
Wei, Davis, Wobber, Balakrishnan, Malkhi
Summary report of implmementing the CORFU server-side in
FPGA-style hardware. (~11 pages)
http://www.snookles.com/scottmp/corfu/beyond-block-io.CameraReady.pdf
### "Tango: Distributed Data Structures over a Shared Log"
Balakrishnan, Malkhi, Wobber, Wu, Brabhakaran, Wei, Davis, Rao, Zou, Zuck
Describes a framework for developing data structures that reside
persistently within a CORFU log: the log *is* the database/data
structure store.
http://www.snookles.com/scottmp/corfu/Tango.pdf
### "Dynamically Scalable, Fault-Tolerant Coordination on a Shared Logging Service"
Wei, Balakrishnan, Davis, Malkhi, Prabhakaran, Wobber
The ZooKeeper inter-server communication is replaced with CORFU.
Faster, fewer lines of code than ZK, and more features than the
original ZK code base.
http://www.snookles.com/scottmp/corfu/zookeeper-techreport.pdf
### "Hyder A Transactional Record Manager for Shared Flash"
Bernstein, Reid, Das
Describes a distributed log-based DB system where the txn log is
treated quite oddly: a "txn intent" record is written to a
shared common log All participants read the shared log in
parallel and make commit/abort decisions in parallel, based on
what conflicts (or not) that they see in the log. Scott's first
reading was "No way, wacky" ... and has since changed his mind.
http://www.snookles.com/scottmp/corfu/CIDR2011Proceedings.pdf
pages 9-20
## Fiddling with PULSE
Do the following:
make clean
make
make pulse
... then watch the dots go across the screen for 60 seconds. If you
wish, you can press `Control-c` to interrupt the test. We're really
interested in the build artifacts.
erl -pz .eunit deps/*/ebin
eqc:quickcheck(eqc:testing_time(5, corfurl_pulse:prop_pulse())).
This will run the PULSE test for 5 seconds. Feel free to adjust for
as many seconds as you wish.
Erlang R16B02-basho4 (erts-5.10.3) [source] [64-bit] [smp:8:8] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]
Eshell V5.10.3 (abort with ^G)
1> eqc:quickcheck(eqc:testing_time(5, corfurl_pulse:prop_pulse())).
Starting Quviq QuickCheck version 1.30.4
(compiled at {{2014,2,7},{9,19,50}})
Licence for Basho reserved until {{2014,2,17},{1,41,39}}
......................................................................................
OK, passed 86 tests
schedule: Count: 86 Min: 2 Max: 1974 Avg: 3.2e+2 Total: 27260
true
2>
REPL interactive work can be done via:
1. Edit code, e.g. `corfurl_pulse.erl`.
2. Run `env BITCASK_PULSE=1 ./rebar skip_deps=true -D PULSE eunit suites=SKIP`
to compile.
3. Reload any recompiled modules, e.g. `l(corfurl_pulse).`
4. Resume QuickCheck activities.
## Seeing an PULSE scheduler interleaving failure in action
1. Edit `corfurl_pulse:check_trace()` to uncomment the
use of `conjunction()` that mentions `bogus_order_check_do_not_use_me`
and comment out the real `conjunction()` call below it.
2. Recompile & reload.
3. Check.
For example:
9> eqc:quickcheck(eqc:testing_time(5, corfurl_pulse:prop_pulse())).
.........Failed! After 9 tests.
Sweet! The first tuple below are the first `?FORALL()` values,
and the 2nd is the list of commands,
`{SequentialCommands, ListofParallelCommandLists}`. The 3rd is the
seed used to perturb the PULSE scheduler.
In this case, `SequentialCommands` has two calls (to `setup()` then
`append()`) and there are two parallel procs: one makes 1 call
call to `append()` and the other makes 2 calls to `append()`.
{2,2,9}
{{[{set,{var,1},{call,corfurl_pulse,setup,[2,2,9]}}],
[[{set,{var,3},
{call,corfurl_pulse,append,
[{var,1},<<231,149,226,203,10,105,54,223,147>>]}}],
[{set,{var,2},
{call,corfurl_pulse,append,
[{var,1},<<7,206,146,75,249,13,154,238,110>>]}},
{set,{var,4},
{call,corfurl_pulse,append,
[{var,1},<<224,121,129,78,207,23,79,216,36>>]}}]]},
{27492,46961,4884}}
Here are our results:
simple_result: passed
errors: passed
events: failed
identity: passed
bogus_order_check_do_not_use_me: failed
[{ok,1},{ok,3},{ok,2}] /= [{ok,1},{ok,2},{ok,3}]
Our (bogus!) order expectation was violated. Shrinking!
simple_result: passed
errors: passed
events: failed
identity: passed
bogus_order_check_do_not_use_me: failed
[{ok,1},{ok,3},{ok,2}] /= [{ok,1},{ok,2},{ok,3}]
Shrinking was able to remove two `append()` calls and to shrink the
size of the pages down from 9 bytes down to 1 byte.
Shrinking........(8 times)
{1,1,1}
{{[{set,{var,1},{call,corfurl_pulse,setup,[1,1,1]}}],
[[{set,{var,3},{call,corfurl_pulse,append,[{var,1},<<0>>]}}],
[{set,{var,4},{call,corfurl_pulse,append,[{var,1},<<0>>]}}]]},
{27492,46961,4884}}
events: failed
bogus_order_check_do_not_use_me: failed
[{ok,2},{ok,1}] /= [{ok,1},{ok,2}]
false

View file

@ -1,35 +0,0 @@
msc {
client1, FLU1, FLU2, client2, client3;
client1 box client3 [label="Epoch #1: chain = FLU1 -> FLU2"];
client1 -> FLU1 [label="{write,epoch1,<<Page YYY>>}"];
client1 <- FLU1 [label="ok"];
client1 box client1 [label="Client crash", textcolour="red"];
FLU1 box FLU1 [label="FLU crash", textcolour="red"];
client1 box client3 [label="Epoch #2: chain = FLU2"];
client2 -> FLU2 [label="{write,epoch2,<<Page ZZZ>>}"];
client2 <- FLU2 [label="ok"];
client3 box client3 [label="Read repair starts", textbgcolour="aqua"];
client3 -> FLU2 [label="{read,epoch2}"];
client3 <- FLU2 [label="{ok,<<Page ZZZ>>}"];
client3 -> FLU1 [label="{write,epoch2,<<Page ZZZ>>}"];
FLU1 box FLU1 [label="What do we do here? Our current value is <<Page YYY>>.", textcolour="red"] ;
FLU1 box FLU1 [label="If we do not accept the repair value, then we are effectively UNREPAIRABLE.", textcolour="red"] ;
FLU1 box FLU1 [label="If we do accept the repair value, then we are mutating an already-written value.", textcolour="red"] ;
FLU1 -> client3 [label="I'm sorry, Dave, I cannot do that."];
FLU1 box FLU1 [label = "In theory, while repair is still happening, nobody will ever ask FLU1 for its value.", textcolour="black"] ;
client3 -> FLU1 [label="{write,epoch2,<<Page ZZZ>>,repair,witnesses=[FLU2]}", textbgcolour="silver"];
FLU1 box FLU1 [label="Start an async process to ask the witness list to corroborate this repair."];
FLU1 -> FLU2 [label="{read,epoch2}", textbgcolour="aqua"];
FLU1 <- FLU2 [label="{ok,<<Page ZZ>>}", textbgcolour="aqua"];
FLU1 box FLU1 [label="Overwrite local storage with repair page.", textbgcolour="silver"];
client3 <- FLU1 [label="Async proc replies: ok", textbgcolour="silver"];
}

View file

@ -1,92 +0,0 @@
## read-repair-race.1.
First attempt at using "mscgen" to make some Message Sequence
Chart (MSC) for a race found at commit 087c2605ab.
## read-repair-race.2.
Second attempt. This is almost exactly the trace that is
generated by this failing test case at commit 087c2605ab:
C2 = [{1,2,1},{{[{set,{var,1},{call,corfurl_pulse,setup,[1,2,1,standard]}}],[[{set,{var,3},{call,corfurl_pulse,append,[{var,1},<<0>>]}}],[{set,{var,2},{call,corfurl_pulse,read_approx,[{var,1},6201864198]}},{set,{var,5},{call,corfurl_pulse,append,[{var,1},<<0>>]}}],[{set,{var,4},{call,corfurl_pulse,append,[{var,1},<<0>>]}},{set,{var,6},{call,corfurl_pulse,trim,[{var,1},510442857]}}]]},{25152,1387,78241}},[{events,[[{no_bad_reads,[]}]]}]].
eqc:check(corfurl_pulse:prop_pulse(), C2).
## read-repair-race.2b.*
Same basic condition as read-repair-race.2, but edited
substantially to make it clearer what is happening.
Also for commit 087c2605ab.
I believe that I have a fix for the silver-colored
`error-overwritten` ... and it was indeed added to the code soon
afterward, but it turns out that it doesn't solve the entire problem
of "two clients try to write the exact same data at the same time to
the same LPN".
## "Two Clients Try to Write the Exact Same Data at the Same Time to the Same LPN"
This situation is something that CORFU cannot protect against, IMO.
I have been struggling for a while, to try to find a way for CORFU
clients to know *always* when there is a conflict with another
writer. It usually works: the basic nature of write-once registers is
very powerful. However, in the case where two clients are trying to
write the same page data to the same LPN, it looks impossible to
resolve.
How do you tell the difference between:
1. A race between a client A writing page P at address LPN and
read-repair fixing P. P *is* A's data and no other's, so this race
doesn't confuse anyone.
1. A race between a client A writing page P at address LPN and client
B writing the exact same page data P at the same LPN.
A's page P = B's page P, but clients A & B don't know that.
If CORFU tells both A & B that they were successful, A & B assume
that the CORFU log has two new pages appended to it, but in truth
only one new page was appended.
If we try to solve this by always avoiding the same LPN address
conflict, we are deluding ourselves. If we assume that the sequencer
is 100% correct in that it never assigns the same LPN twice, and if we
assume that a client must never write a block without an assignment
from the sequencer, then the problem is solved. But the problem has a
_heavy_ price: the log is only available when the sequencer is
available, and only when never more than one sequencer running at a
time.
The CORFU base system promises correct operation, even if:
* Zero sequencers are running, and clients might choose the same LPN
to write to.
* Two more more sequencers are running, and different sequencers
assign the same LPN to two different clients.
But CORFU's "correct" behavior does not include detecting the same
page at the same LPN. The papers don't specifically say it, alas.
But IMO it's impossible to guarantee, so all docs ought to explicitly
say that it's impossible and that clients must not assume it.
See also
* two-clients-race.1.png
## A scenario of chain repair & write-once registers
See:
* 2014-02-27.chain-repair-write-twice.png
... for a scenario where write-once registers that are truly only
write-once-ever-for-the-rest-of-the-future are "inconvenient" when it
comes to chain repair. Client 3 is attempting to do chain repair ops,
bringing FLU1 back into sync with FLU2.
The diagram proposes one possible idea for making overwriting a
read-once register a bit safer: ask another node in the chain to
verify that the page you've been asked to repair is exactly the same
as that other FLU's page.

View file

@ -1,49 +0,0 @@
msc {
"<0.12583.0>" [label="Client1"], "<0.12574.0>" [label="FLU1"], "<0.12575.0>" [label="FLU2"], "<0.12576.0>" [label="FLU3"], "<0.12584.0>" [label="Client2"], "<0.12585.0>" [label="Client3"];
"<0.12585.0>" -> "<0.12576.0>" [ label = "{read,1,1}" ] ;
"<0.12583.0>" -> "<0.12574.0>" [ label = "{write,1,1,<<0>>}" ] ;
"<0.12576.0>" -> "<0.12585.0>" [ label = "error_unwritten" ] ;
"<0.12585.0>" abox "<0.12585.0>" [ label="Read Repair starts", textbgcolour="yellow"];
"<0.12585.0>" -> "<0.12574.0>" [ label = "{read,1,1}" ] ;
"<0.12574.0>" -> "<0.12583.0>" [ label = "ok" ] ;
"<0.12583.0>" -> "<0.12575.0>" [ label = "{write,1,1,<<0>>}" ] ;
"<0.12574.0>" -> "<0.12585.0>" [ label = "{ok,<<0>>}" ,textcolour="red"] ;
"<0.12585.0>" -> "<0.12575.0>" [ label = "{write,1,1,<<0>>}" ] ;
"<0.12575.0>" -> "<0.12585.0>" [ label = "ok" ] ;
"<0.12585.0>" -> "<0.12576.0>" [ label = "{write,1,1,<<0>>}" ] ;
"<0.12575.0>" -> "<0.12583.0>" [ label = "error_overwritten" ] ;
"<0.12583.0>" abox "<0.12583.0>" [ label = "Race with read repair? Read to double-check", textbgcolour="yellow" ] ;
"<0.12583.0>" -> "<0.12575.0>" [ label = "{read,1,1}" ] ;
"<0.12576.0>" -> "<0.12585.0>" [ label = "ok" ] ;
"<0.12585.0>" abox "<0.12585.0>" [ label="Read Repair SUCCESS", textbgcolour="green"];
"<0.12585.0>" abox "<0.12585.0>" [ label="Our problem: the PULSE model never believes that append_page ever wrote LPN 1", textcolour="red"];
"<0.12584.0>" abox "<0.12584.0>" [ label = "Client2 decides to trim LPN 1", textbgcolour="orange" ] ;
"<0.12584.0>" -> "<0.12574.0>" [ label = "{trim,1,1}" ] ;
"<0.12575.0>" -> "<0.12583.0>" [ label = "{ok,<<0>>}"] ;
"<0.12583.0>" abox "<0.12583.0>" [ label = "Value matches, yay!", textbgcolour="yellow" ] ;
"<0.12583.0>" abox "<0.12583.0>" [ label = "Continue writing", textbgcolour="yellow" ] ;
"<0.12583.0>" -> "<0.12576.0>" [ label = "{write,1,1,<<0>>}" ] ;
"<0.12574.0>" -> "<0.12584.0>" [ label = "ok" ] ;
"<0.12584.0>" -> "<0.12575.0>" [ label = "{trim,1,1}" ] ;
"<0.12576.0>" -> "<0.12583.0>" [ label = "error_overwritten" ] ;
"<0.12583.0>" abox "<0.12583.0>" [ label = "Race with read repair? Read to double-check", textbgcolour="yellow" ] ;
"<0.12583.0>" -> "<0.12576.0>" [ label = "{read,1,1}" ] ;
"<0.12575.0>" -> "<0.12584.0>" [ label = "ok" ] ;
"<0.12584.0>" -> "<0.12576.0>" [ label = "{trim,1,1}" ] ;
"<0.12576.0>" -> "<0.12584.0>" [ label = "ok" ] ;
"<0.12576.0>" -> "<0.12583.0>" [ label = "error_trimmed" ] ;
"<0.12583.0>" abox "<0.12583.0>" [ label = "Value MISMATCH!", textcolour="red" ] ;
"<0.12583.0>" abox "<0.12583.0>" [ label = "Read repair", textbgcolour="yellow" ] ;
"<0.12583.0>" -> "<0.12574.0>" [ label = "{read,1,1}" ] ;
"<0.12574.0>" -> "<0.12583.0>" [ label = "error_trimmed" ] ;
"<0.12583.0>" -> "<0.12575.0>" [ label = "{fill,1,1}" ] ;
"<0.12575.0>" -> "<0.12583.0>" [ label = "error_trimmed" ] ;
"<0.12583.0>" -> "<0.12576.0>" [ label = "{fill,1,1}" ] ;
"<0.12576.0>" -> "<0.12583.0>" [ label = "error_trimmed" ] ;
"<0.12583.0>" abox "<0.12583.0>" [ label = "At this point, we give up on LPN 1.", textcolour="red" ] ;
"<0.12583.0>" abox "<0.12583.0>" [ label = "Sequencer gives us LPN 2", textbgcolour="yellow" ] ;
"<0.12583.0>" abox "<0.12583.0>" [ label = "LPN 2 has been filled (not shown).", textbgcolour="yellow" ] ;
"<0.12583.0>" abox "<0.12583.0>" [ label = "Sequencer gives us LPN 3", textbgcolour="yellow" ] ;
"<0.12583.0>" abox "<0.12583.0>" [ label = "We write LPN 3 successfully", textbgcolour="green" ] ;
}

View file

@ -1,60 +0,0 @@
msc {
"<0.32555.4>" [label="Client1"], "<0.32551.4>" [label="FLU1"], "<0.32552.4>" [label="FLU2"], "<0.32556.4>" [label="Client2"], "<0.32557.4>" [label="Client3"];
"<0.32555.4>" abox "<0.32555.4>" [ label = "Writer", textbgcolour="orange"],
"<0.32556.4>" abox "<0.32556.4>" [ label = "Reader", textbgcolour="orange"],
"<0.32557.4>" abox "<0.32557.4>" [ label = "Trimmer", textbgcolour="orange"];
"<0.32555.4>" abox "<0.32555.4>" [ label = "append_page()", textbgcolour="yellow"] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Sequencer assigns LPN 1", textbgcolour="yellow"] ;
"<0.32555.4>" -> "<0.32551.4>" [ label = "{write,1,1,<<0>>}" ] ;
"<0.32556.4>" abox "<0.32556.4>" [ label = "read_page(LPN 1)", textbgcolour="yellow"] ;
"<0.32556.4>" -> "<0.32552.4>" [ label = "{read,1,1}" ] ;
"<0.32552.4>" -> "<0.32556.4>" [ label = "error_unwritten" ] ;
"<0.32556.4>" abox "<0.32556.4>" [ label = "Start read repair", textbgcolour="aqua"] ;
"<0.32556.4>" -> "<0.32551.4>" [ label = "{read,1,1}" ] ;
"<0.32551.4>" -> "<0.32555.4>" [ label = "ok" ] ;
"<0.32551.4>" -> "<0.32556.4>" [ label = "{ok,<<0>>}" ] ;
"<0.32556.4>" -> "<0.32552.4>" [ label = "{write,1,1,<<0>>}" ] ;
"<0.32555.4>" -> "<0.32552.4>" [ label = "{write,1,1,<<0>>}" ] ;
"<0.32557.4>" -> "<0.32551.4>" [ label = "{trim,1,1}" ] ;
"<0.32552.4>" -> "<0.32555.4>" [ label = "error_overwritten" ] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Our attempt to write LPN 1 is interrupted", textbgcolour="yellow"] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Check if an eager read-repair has written our data for us.", textbgcolour="yellow"] ;
"<0.32555.4>" -> "<0.32552.4>" [ label = "{read,1,1}" ] ;
"<0.32551.4>" -> "<0.32557.4>" [ label = "ok" ] ;
"<0.32552.4>" -> "<0.32556.4>" [ label = "ok" ] ;
"<0.32556.4>" abox "<0.32556.4>" [ label = "End read repair", textbgcolour="aqua"] ;
"<0.32556.4>" abox "<0.32556.4>" [ label = "read_page(LPN 1) -> {ok, <<0>>}", textbgcolour="yellow"] ;
"<0.32556.4>" abox "<0.32556.4>" [ label = "See red stuff at bottom....", textcolour="red"] ;
# "<0.32556.4>" abox "<0.32556.4>" [ label = "But PULSE thinks that LPN 1 was never written.", textcolour="red"] ;
# "<0.32556.4>" abox "<0.32556.4>" [ label = "Fixing this requires ... lots of pondering...", textcolour="red"] ;
"<0.32557.4>" -> "<0.32552.4>" [ label = "{trim,1,1}" ] ;
"<0.32552.4>" -> "<0.32557.4>" [ label = "ok" ] ;
"<0.32552.4>" -> "<0.32555.4>" [ label = "error_trimmed" ] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Wow, an eager trimmer got us, ouch.", textbgcolour="yellow"] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Start read repair", textbgcolour="aqua"] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Read repair here is for sanity checking, not really necessary.", textbgcolour="yellow"] ;
"<0.32555.4>" -> "<0.32551.4>" [ label = "{read,1,1}" ] ;
"<0.32551.4>" -> "<0.32555.4>" [ label = "error_trimmed" ] ;
"<0.32555.4>" -> "<0.32552.4>" [ label = "{fill,1,1}" ] ;
"<0.32552.4>" -> "<0.32555.4>" [ label = "error_trimmed" ] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "End read repair", textbgcolour="aqua"] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Our attempt to write LPN 1 has failed. Must ask sequencer for a new LPN.", textbgcolour="yellow"] ;
"<0.32551.4>" abox "<0.32552.4>" [ label = "LPN 2 is written (race details omitted)", textbgcolour="orange"] ;
"<0.32551.4>" abox "<0.32552.4>" [ label = "LPN 3 is written (race details omitted)", textbgcolour="orange"] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Sequencer assigns LPN 4", textbgcolour="yellow"] ;
"<0.32555.4>" -> "<0.32551.4>" [ label = "{write,1,4,<<0>>}" ] ;
"<0.32551.4>" -> "<0.32555.4>" [ label = "ok" ] ;
"<0.32555.4>" -> "<0.32552.4>" [ label = "{write,1,4,<<0>>}" ] ;
"<0.32552.4>" -> "<0.32555.4>" [ label = "ok" ] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "append_page() -> LPN 4", textbgcolour="yellow"] ;
"<0.32555.4>" abox "<0.32557.4>" [ label="Small problem: the PULSE model never believes that append_page ever wrote LPN 1", textcolour="red"];
"<0.32556.4>" abox "<0.32556.4>" [ label = "read_page(LPN 1)", textbgcolour="yellow"] ;
"<0.32556.4>" -> "<0.32552.4>" [ label = "{read,1,4}" ] ;
"<0.32552.4>" -> "<0.32556.4>" [ label = "{ok,<<0>>}" ] ;
"<0.32556.4>" abox "<0.32556.4>" [ label = "read_page(LPN 4) -> {ok, <<0>>}", textbgcolour="yellow"] ;
"<0.32555.4>" abox "<0.32557.4>" [ label="Big problem: Client2 has witnessed the same page written at LPN 1 and at LPN 4.", textcolour="red"];
"<0.32555.4>" abox "<0.32557.4>" [ label="", textcolour="red"];
"<0.32555.4>" abox "<0.32557.4>" [ label="", textcolour="red"];
}

View file

@ -1,57 +0,0 @@
msc {
"<0.32555.4>" [label="Client1"], "<0.32551.4>" [label="FLU1=Head"], "<0.32552.4>" [label="FLU2=Tail"], "<0.32556.4>" [label="Client2"], "<0.32557.4>" [label="Client3"];
"<0.32555.4>" abox "<0.32555.4>" [ label = "Writer", textbgcolour="orange"],
"<0.32556.4>" abox "<0.32556.4>" [ label = "Reader", textbgcolour="orange"],
"<0.32557.4>" abox "<0.32557.4>" [ label = "Trimmer", textbgcolour="orange"];
"<0.32555.4>" abox "<0.32555.4>" [ label = "append_page()", textbgcolour="yellow"] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Sequencer assigns LPN 1", textbgcolour="yellow"] ;
"<0.32555.4>" -> "<0.32551.4>" [ label = "{write,1,1,<<0>>}" ] ;
"<0.32551.4>" -> "<0.32555.4>" [ label = "ok" ] ;
"<0.32556.4>" abox "<0.32556.4>" [ label = "read_page(LPN 1)", textbgcolour="yellow"] ;
"<0.32556.4>" -> "<0.32552.4>" [ label = "{read,1,1}" ] ;
"<0.32552.4>" -> "<0.32556.4>" [ label = "error_unwritten" ] ;
"<0.32556.4>" abox "<0.32556.4>" [ label = "Start read repair", textbgcolour="aqua"] ;
"<0.32556.4>" -> "<0.32551.4>" [ label = "{read,1,1}" ] ;
"<0.32551.4>" -> "<0.32556.4>" [ label = "{ok,<<0>>}" ] ;
"<0.32556.4>" -> "<0.32552.4>" [ label = "{write,1,1,<<0>>}" ] ;
"<0.32552.4>" -> "<0.32556.4>" [ label = "ok" ] ;
"<0.32556.4>" abox "<0.32556.4>" [ label = "End read repair", textbgcolour="aqua"] ;
"<0.32556.4>" abox "<0.32556.4>" [ label = "read_page(LPN 1) -> {ok, <<0>>}", textbgcolour="yellow"] ;
"<0.32556.4>" abox "<0.32556.4>" [ label = "See red stuff at bottom....", textcolour="red"] ;
# "<0.32556.4>" abox "<0.32556.4>" [ label = "But PULSE thinks that LPN 1 was never written.", textcolour="red"] ;
# "<0.32556.4>" abox "<0.32556.4>" [ label = "Fixing this requires ... lots of pondering...", textcolour="red"] ;
"<0.32557.4>" -> "<0.32551.4>" [ label = "{trim,1,1}" ] ;
"<0.32551.4>" -> "<0.32557.4>" [ label = "ok" ] ;
"<0.32557.4>" -> "<0.32552.4>" [ label = "{trim,1,1}" ] ;
"<0.32552.4>" -> "<0.32557.4>" [ label = "ok" ] ;
"<0.32555.4>" -> "<0.32552.4>" [ label = "{write,1,1,<<0>>}" ] ;
"<0.32552.4>" -> "<0.32555.4>" [ label = "error_overwritten", textbgcolour="silver" ] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Our attempt to write LPN 1 is interrupted", textbgcolour="yellow"] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Check if an eager read-repair has written our data for us.", textbgcolour="yellow"] ;
"<0.32555.4>" -> "<0.32552.4>" [ label = "{read,1,1}" ] ;
"<0.32552.4>" -> "<0.32555.4>" [ label = "error_trimmed" ] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Wow, an eager trimmer got us, ouch.", textbgcolour="yellow"] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Start read repair", textbgcolour="aqua"] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Read repair here is for sanity checking, not really necessary.", textbgcolour="yellow"] ;
"<0.32555.4>" -> "<0.32551.4>" [ label = "{read,1,1}" ] ;
"<0.32551.4>" -> "<0.32555.4>" [ label = "error_trimmed" ] ;
"<0.32555.4>" -> "<0.32552.4>" [ label = "{fill,1,1}" ] ;
"<0.32552.4>" -> "<0.32555.4>" [ label = "error_trimmed" ] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "End read repair", textbgcolour="aqua"] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Our attempt to write LPN 1 has failed. Must ask sequencer for a new LPN.", textbgcolour="yellow"] ;
"<0.32551.4>" abox "<0.32552.4>" [ label = "LPN 2 and 3 are written (race details omitted)", textbgcolour="orange"] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "Sequencer assigns LPN 4", textbgcolour="yellow"] ;
"<0.32555.4>" -> "<0.32551.4>" [ label = "{write,1,4,<<0>>}" ] ;
"<0.32551.4>" -> "<0.32555.4>" [ label = "ok" ] ;
"<0.32555.4>" -> "<0.32552.4>" [ label = "{write,1,4,<<0>>}" ] ;
"<0.32552.4>" -> "<0.32555.4>" [ label = "ok" ] ;
"<0.32555.4>" abox "<0.32555.4>" [ label = "append_page() -> LPN 4", textbgcolour="yellow"] ;
"<0.32555.4>" abox "<0.32557.4>" [ label="Small problem: the PULSE model never believes that append_page ever wrote LPN 1", textcolour="red"];
"<0.32556.4>" abox "<0.32556.4>" [ label = "read_page(LPN 1)", textbgcolour="yellow"] ;
"<0.32556.4>" -> "<0.32552.4>" [ label = "{read,1,4}" ] ;
"<0.32552.4>" -> "<0.32556.4>" [ label = "{ok,<<0>>}" ] ;
"<0.32556.4>" abox "<0.32556.4>" [ label = "read_page(LPN 4) -> {ok, <<0>>}", textbgcolour="yellow"] ;
"<0.32555.4>" abox "<0.32557.4>" [ label="Big problem: Client2 has witnessed the same page written at LPN 1 and at LPN 4.", textcolour="red"];
}

View file

@ -1,33 +0,0 @@
msc {
client1, FLU1, FLU2, client2, client3;
client1 -> FLU1 [label="{write,epoch1,<<Not unique page>>}"];
client1 <- FLU1 [label="ok"];
client3 -> FLU2 [label="{seal,epoch1}"];
client3 <- FLU2 [label="{ok,...}"];
client3 -> FLU1 [label="{seal,epoch1}"];
client3 <- FLU1 [label="{ok,...}"];
client2 -> FLU1 [label="{write,epoch1,<<Not unique page>>}"];
client2 <- FLU1 [label="error_epoch"];
client2 abox client2 [label="Ok, get the new epoch info....", textbgcolour="silver"];
client2 -> FLU1 [label="{write,epoch2,<<Not unique page>>}"];
client2 <- FLU1 [label="error_overwritten"];
client1 -> FLU2 [label="{write,epoch1,<<Not unique page>>}"];
client1 <- FLU2 [label="error_epoch"];
client1 abox client1 [label="Ok, hrm.", textbgcolour="silver"];
client3 abox client3 [ label = "Start read repair", textbgcolour="aqua"] ;
client3 -> FLU1 [label="{read,epoch2}"];
client3 <- FLU1 [label="{ok,<<Not unique page>>}"];
client3 -> FLU2 [label="{write,epoch2,<<Not unique page>>}"];
client3 <- FLU2 [label="ok"];
client3 abox client3 [ label = "End read repair", textbgcolour="aqua"] ;
client3 abox client3 [ label = "We saw <<Not unique page>>", textbgcolour="silver"] ;
client1 -> FLU2 [label="{write,epoch2,<<Not unique page>>}"];
client1 <- FLU2 [label="error_overwritten"];
}

View file

@ -0,0 +1,6 @@
%%% {erl_opts, [warnings_as_errors, {parse_transform, lager_transform}, debug_info]}.
{erl_opts, [{parse_transform, lager_transform}, debug_info]}.
{deps, [
{lager, "2.0.1", {git, "git://github.com/basho/lager.git", {tag, "2.0.1"}}}
]}.

View file

@ -0,0 +1,9 @@
{application, tango, [
{description, "Really quick hack prototype of Tango on top of corfurl."},
{vsn, "0.0.0"},
{applications, [kernel, stdlib, lager]},
{mod,{tango_does_not_exist_app,[]}},
{registered, []},
{env, [
]}
]}.

View file

@ -31,7 +31,8 @@
-define(LONG_TIME, 30*1000).
-define(D(X), io:format(user, "Dbg: ~s =\n ~p\n", [??X, X])).
-define(D(X), ok).
%% -define(D(X), io:format(user, "Dbg: ~s =\n ~p\n", [??X, X])).
-type lpn() :: non_neg_integer().

View file

@ -1,135 +0,0 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(corfurl_flu_test).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-endif.
-include("corfurl.hrl").
-define(M, corfurl_flu).
-ifdef(TEST).
-ifndef(PULSE).
startstop_test() ->
Dir = "/tmp/flu." ++ os:getpid(),
{ok, P1} = ?M:start_link(Dir),
try
{ok, _} = ?M:status(P1),
ok = ?M:stop(P1),
{'EXIT', _} = (catch ?M:stop(P1)),
{ok, P2} = ?M:start_link(Dir),
0 = ?M:get__mlp(P2),
0 = ?M:get__min_epoch(P2),
ok = ?M:stop(P2),
ok
after
ok = corfurl_util:delete_dir(Dir)
end.
basic_test() ->
Dir = "/tmp/flu." ++ os:getpid(),
{ok, P1} = ?M:start_link(Dir),
try
Epoch1 = 1,
Epoch2 = 2,
Epoch3 = 3,
LPN = 1,
Bin1 = <<42:64>>,
Bin2 = <<42042:64>>,
error_unwritten = ?M:read(P1, Epoch1, LPN),
error_unwritten = ?M:trim(P1, Epoch1, LPN),
error_unwritten = ?M:trim(P1, Epoch1, LPN+77),
ok = ?M:write(P1, Epoch1, LPN, Bin1),
error_overwritten = ?M:write(P1, Epoch1, LPN, Bin1),
error_overwritten = ?M:fill(P1, Epoch1, LPN),
LPN = ?M:get__mlp(P1),
0 = ?M:get__min_epoch(P1),
0 = ?M:get__trim_watermark(P1),
{ok, LPN} = ?M:seal(P1, Epoch1),
2 = ?M:get__min_epoch(P1),
error_overwritten = ?M:write(P1, Epoch2, LPN, Bin1),
ok = ?M:write(P1, Epoch2, LPN+1, Bin2),
Epoch2 = ?M:get__min_epoch(P1),
error_badepoch = ?M:read(P1, Epoch1, LPN),
{ok, Bin2} = ?M:read(P1, Epoch2, LPN+1),
error_unwritten = ?M:read(P1, Epoch2, LPN+2),
badarg = ?M:read(P1, Epoch2, 1 bsl 2982),
error_badepoch = ?M:seal(P1, Epoch1),
{ok, _} = ?M:seal(P1, Epoch2),
error_badepoch = ?M:seal(P1, Epoch2),
error_badepoch = ?M:read(P1, Epoch1, LPN),
error_badepoch = ?M:read(P1, Epoch1, LPN+1),
{ok, Bin1} = ?M:read(P1, Epoch3, LPN),
{ok, Bin2} = ?M:read(P1, Epoch3, LPN+1),
error_badepoch = ?M:trim(P1, Epoch1, LPN+1),
ok = ?M:trim(P1, Epoch3, LPN+1),
error_trimmed = ?M:trim(P1, Epoch3, LPN+1),
%% Current watermark processing is broken. But we'll test what's
%% there now.
ExpectedWaterFixMe = LPN+1,
ExpectedWaterFixMe = ?M:get__trim_watermark(P1),
ok = ?M:fill(P1, Epoch3, LPN+3),
error_trimmed = ?M:read(P1, Epoch3, LPN+3),
error_trimmed = ?M:fill(P1, Epoch3, LPN+3),
error_trimmed = ?M:trim(P1, Epoch3, LPN+3),
Epoch3 = ?M:get__min_epoch(P1),
ok = ?M:stop(P1),
ok
after
ok = corfurl_util:delete_dir(Dir)
end.
seal_persistence_test() ->
Dir = "/tmp/flu." ++ os:getpid(),
{ok, P1} = ?M:start_link(Dir),
try
0 = ?M:get__min_epoch(P1),
Epoch = 665,
{ok, LPN} = ?M:seal(P1, Epoch-1),
Epoch = ?M:get__min_epoch(P1),
ok = ?M:stop(P1),
{ok, P2} = ?M:start_link(Dir),
Epoch = ?M:get__min_epoch(P2),
ok = ?M:stop(P2),
ok
after
ok = corfurl_util:delete_dir(Dir)
end.
-endif. % not PULSE
-endif. % TEST

View file

@ -1,950 +0,0 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(corfurl_pulse).
-ifdef(TEST).
-ifdef(PULSE).
-compile(export_all).
-include_lib("eqc/include/eqc.hrl").
-include_lib("eqc/include/eqc_statem.hrl").
-include("corfurl.hrl").
-include_lib("eunit/include/eunit.hrl").
-compile({parse_transform, pulse_instrument}).
-compile({pulse_skip,[{prop_pulse_test_,0},{clean_up_runtime,1},{delete_dir,1}]}).
%% -compile({pulse_no_side_effect,[{file,'_','_'}, {erlang, now, 0}]}).
%% Used for output within EUnit...
-define(QC_FMT(Fmt, Args),
io:format(user, Fmt, Args)).
%% And to force EUnit to output QuickCheck output...
-define(QC_OUT(P),
eqc:on_output(fun(Str, Args) -> ?QC_FMT(Str, Args) end, P)).
-define(MAX_PAGES, 50000).
-define(MY_TAB, i_have_a_name).
-define(MY_KEY, ?MY_TAB).
-define(PROJECTION_DIR, "./tmp.projection." ++ os:getpid()).
-define(SEQUENCER_NAME, 'corfurl pulse seq thingie').
-record(run, {
proj, % Projection
flus % List of FLUs
}).
-record(state, {
is_setup = false :: boolean(),
num_chains = 0 :: integer(),
chain_len = 0 :: integer(),
page_size = 0 :: integer(),
run :: #run{}
}).
%% Model testing things:
%% Define true to fake bad behavior that model **must** notice & fail!
-ifndef(TRIP_no_append_duplicates).
-define(TRIP_no_append_duplicates, false).
-endif.
-ifndef(TRIP_bad_read).
-define(TRIP_bad_read, false).
-endif.
-ifndef(TRIP_bad_scan_forward).
-define(TRIP_bad_scan_forward, false).
-endif.
-ifndef(TRIP_bad_fill).
-define(TRIP_bad_fill, false).
-endif.
-ifndef(TRIP_bad_trim).
-define(TRIP_bad_trim, false).
-endif.
initial_state() ->
#state{}.
gen_page(PageSize) ->
binary(PageSize).
gen_seed() ->
noshrink({choose(1, 20000), choose(1, 20000), choose(1, 20000)}).
gen_sequencer_percent() ->
frequency([{10, choose(1,100)},
{5, choose(90,100)}]).
gen_sequencer() ->
frequency([{100, standard},
{50, {gen_seed(), gen_sequencer_percent(), choose(1, 2)}}]).
gen_approx_page() ->
%% EQC can't know what pages are perhaps-written, so pick something big.
noshrink(?LET(I, largeint(), abs(I))).
gen_scan_forward_start() ->
oneof([1, gen_approx_page()]).
gen_stop_method() ->
oneof([stop, kill]).
command(#state{run=Run} = S) ->
?LET({NumChains, ChainLen, PageSize},
{parameter(num_chains), parameter(chain_len), parameter(page_size)},
frequency(
[{50, {call, ?MODULE, setup, [NumChains, ChainLen, PageSize, gen_sequencer()]}}
|| not S#state.is_setup] ++
[{50, {call, ?MODULE, append, [Run, gen_page(PageSize)]}}
|| S#state.is_setup] ++
[{15, {call, ?MODULE, read_approx, [Run, gen_approx_page()]}}
|| S#state.is_setup] ++
[{15, {call, ?MODULE, scan_forward, [Run, gen_scan_forward_start(), nat()]}}
|| S#state.is_setup] ++
[{12, {call, ?MODULE, fill, [Run, gen_approx_page()]}}
|| S#state.is_setup] ++
[{12, {call, ?MODULE, trim, [Run, gen_approx_page()]}}
|| S#state.is_setup] ++
[{10, {call, ?MODULE, stop_sequencer, [Run, gen_stop_method()]}}
|| S#state.is_setup] ++
[])).
%% Precondition, checked before a command is added to the command sequence.
precondition(S, {call, _, setup, _}) ->
not S#state.is_setup;
precondition(S, {call, _, _, _}) ->
S#state.is_setup.
%% Next state transformation, S is the current state and V is the result of the
%% command.
next_state(S, Res, {call, _, setup, [NumChains, ChainLen, PageSize, _SeqType]}) ->
S#state{is_setup=true,
num_chains=NumChains,
chain_len=ChainLen,
page_size=PageSize,
run=Res};
next_state(S, _, {call, _, append, _}) ->
S;
next_state(S, _, {call, _, read_approx, _}) ->
S;
next_state(S, _, {call, _, scan_forward, _}) ->
S;
next_state(S, _, {call, _, fill, _}) ->
S;
next_state(S, _, {call, _, trim, _}) ->
S;
next_state(S, _, {call, _, stop_sequencer, _}) ->
S.
eqeq(X, X) -> true;
eqeq(X, Y) -> {X, '/=', Y}.
postcondition(_S, {call, _, setup, _}, #run{} = _V) ->
true;
postcondition(_S, {call, _, append, _}, V) ->
case V of
{ok, LPN} when is_integer(LPN) -> true;
{special_trimmed, LPN} when is_integer(LPN) -> true;
error_badepoch -> true;
_ -> eqeq(V, todoTODO_fixit)
end;
postcondition(_S, {call, _, read_approx, _}, V) ->
valid_read_result(V);
postcondition(_S, {call, _, scan_forward, _}, V) ->
case V of
{ok, LastLSN, MoreP, Pages} ->
true = is_integer(LastLSN),
true = LastLSN > 0,
true = (MoreP == true orelse MoreP == false),
[] = lists:usort([X || {_LPN, Pg} <- Pages,
X <- [valid_read_result(Pg)], X /= true]),
true;
_ ->
eqeq(V, {todoTODO_fixit,?LINE})
end;
postcondition(_S, {call, _, FillTrim, _}, V)
when FillTrim == fill; FillTrim == trim ->
case V of
ok -> true;
error_trimmed -> true;
error_unwritten -> true;
error_overwritten -> true;
_ -> eqeq(V, {error, FillTrim, V})
end;
postcondition(_S, {call, _, stop_sequencer, _}, _V) ->
true.
valid_read_result(Pg) when is_binary(Pg) -> true;
valid_read_result(error_unwritten) -> true;
valid_read_result(error_trimmed) -> true;
valid_read_result(V) -> eqeq(V, {todoTODO_fixit,?LINE}).
run_commands_on_node(LocalOrSlave, Cmds, Seed) ->
AfterTime = if LocalOrSlave == local -> 50000;
LocalOrSlave == slave -> 1000000
end,
event_logger:start_link(),
pulse:start(),
delete_dir(?PROJECTION_DIR),
error_logger:tty(false),
error_logger:add_report_handler(handle_errors),
event_logger:start_logging(),
X =
try
{H, S, Res, Trace} = pulse:run(fun() ->
catch ets:new(?MY_TAB, [public, set, named_table]),
ets:insert(?MY_TAB, {?MY_KEY, undefined}),
%% application:start(my_test_app),
%% receive after AfterTime -> ok end,
{H, S, R} = run_parallel_commands(?MODULE, Cmds),
%% io:format(user, "Yooo: H = ~p\n", [H]),
%% io:format(user, "Yooo: S = ~p\n", [S]),
%% io:format(user, "Yooo: R = ~p\n", [R]),
receive after AfterTime -> ok end,
Trace = event_logger:get_events(),
%% receive after AfterTime -> ok end,
[{_, ThisRun}] = ets:lookup(?MY_TAB, ?MY_KEY),
[clean_up_runtime(ThisRun) || ThisRun /= undefined],
%% stop pulse controller *after* clean_up_runtime().
catch exit(pulse_application_controller, shutdown),
{H, S, R, Trace}
end, [{seed, Seed},
{strategy, unfair}]),
Schedule = pulse:get_schedule(),
Errors = gen_event:call(error_logger, handle_errors, get_errors, 60*1000),
{H, S, Res, Trace, Schedule, Errors}
catch
_:Err ->
{'EXIT', Err}
end,
X.
prop_pulse() ->
prop_pulse(local).
prop_pulse(LocalOrSlave) ->
?FORALL({NumChains, ChainLen, PageSize},
{choose(1, 3), choose(1, 3), choose(1, 16)},
begin
P = ?FORALL({Cmds, Seed},
{with_parameters([{num_chains, NumChains},
{chain_len, ChainLen},
{page_size, PageSize}], parallel_commands(?MODULE)),
pulse:seed()},
begin
case run_commands_on_node(LocalOrSlave, Cmds, Seed) of
{'EXIT', Err} ->
equals({'EXIT', Err}, ok);
{_H, S, Res, Trace, Schedule, Errors} ->
CheckTrace = check_trace(Trace, Cmds, Seed),
?WHENFAIL(
S = S, % ?QC_FMT("\nState: ~p\n", [S]),
measure(schedule, length(Schedule),
conjunction(
[{simple_result, equals(Res, ok)},
{errors, equals(Errors, [])},
{events, CheckTrace} ])))
end
end),
P
end).
prop_pulse_test_() ->
Timeout = case os:getenv("PULSE_TIME") of
false -> 60;
Val -> list_to_integer(Val)
end,
ExtraTO = case os:getenv("PULSE_SHRINK_TIME") of
false -> 0;
Val2 -> list_to_integer(Val2)
end,
io:format(user, "prop_pulse_test time: ~p + ~p seconds\n",
[Timeout, ExtraTO]),
{timeout, (Timeout+ExtraTO) + 60,
fun() ->
?assert(eqc:quickcheck(eqc:testing_time(Timeout,?QC_OUT(prop_pulse()))))
end}.
%% Example Trace0 (raw event info, from the ?LOG macro)
%%
%% [{32014,{call,<0.467.0>,{append,<<"O">>}}},
%% {32421,{call,<0.466.0>,{append,<<134>>}}},
%% {44522,{result,<0.467.0>,{ok,1}}},
%% {47651,{result,<0.466.0>,{ok,2}}}]
check_trace(Trace0, _Cmds, _Seed) ->
%% Let's treat this thing like a KV store. It is, mostly.
%% Key = LPN, Value = error_unwritten | {ok, Blob} | error_trimmed
%%
%% Problem: At {call, Pid, ...} time, we don't know what Key is!
%% We find out at {return, Pid, {ok, LSN}} time.
%% Also, the append might fail, so the model can ignore those
%% failures because they're not mutating any state that and
%% external viewer can see.
%% WARNING: Trace0 + lamport_clocks means Trace0 is not strictly sorted!
Trace = add_LPN_to_append_calls(lists:sort(Trace0)),
Events = eqc_temporal:from_timed_list(Trace),
%% Example Events, temporal style, 1 usec resolution, same as original trace
%%
%% [{0,32014,[]},
%% {32014,32015,[{call,<0.467.0>,{append,<<"O">>,will_be,1}}]},
%% {32015,32421,[]},
%% {32421,32422,[{call,<0.466.0>,{append,<<134>>,will_be,2}}]},
%% {32422,44522,[]},
%% {44522,44523,[{result,<0.467.0>,{ok,...}}]},
%% {44523,47651,[]},
%% {47651,47652,[{result,<0.466.0>,{ok,...}}]},
%% {47652,infinity,[]}]
Calls = eqc_temporal:stateful(
fun({call, _Pid, _Call} = I) -> [I] end,
fun({call, Pid, _Call}, {result, Pid, _}) -> [] end,
Events),
%% Example Calls (temporal map of when a call is in progress)
%%
%% [{0,32014,[]},
%% {32014,32421,[{call,<0.467.0>,{append,<<"O">>,will_be,1}}]},
%% {32421,44522,
%% [{call,<0.466.0>,{append,<<134>>,will_be,2}},{call,<0.467.0>,{append,<<"O">>,will_be,1}}]},
%% {44522,47651,[{call,<0.466.0>,{append,<<134>>,will_be,2}}]},
%% {47651,infinity,[]}]
AllLPNsR = eqc_temporal:stateful(
fun({call, _Pid, {append, _Pg, will_be, LPN}}) -> LPN;
({call, _Pid, {append, _Pg, will_fail, {special_trimmed, LPN}}}) -> LPN;
({call, _Pid, {read, LPN, _, _}}) -> LPN;
({call, _Pid, {fill, LPN, will_be, ok}}) -> LPN;
({call, _Pid, {trim, LPN, will_be, ok}}) -> LPN;
({call, _Pid, {goo_write, LPN, _Pg}}) -> LPN
end,
fun(x) -> [] end,
Calls),
%%io:format("Calls ~p\n", [Calls]),
%%io:format("AllLPNsR ~p\n", [AllLPNsR]),
%% The last item in the relation tells us what the final facts are in the
%% relation. In this case, it's all LPNs ever mentioned in the test run.
{_, infinity, AllLPNs} = lists:last(eqc_temporal:all_future(AllLPNsR)),
%% Use the following atoms to denote transitions ("Ttn") by an LPN:
%% w_0 = not written yet, error_unwritten
%% w_1 = written successfully, {ok, binary::()}
%% w_ft = fill trimmed, error_trimmed
%% w_tt = trim trimmed, error_trimmed
Mods = eqc_temporal:stateful(
fun({call, Pid, {append, Pg, will_be, LPN}}) ->
{mod_working, w_1, LPN, Pg, Pid};
({call, Pid, {append, Pg, will_fail, {special_trimmed, LPN}}}) ->
%% This is a special case for the model. We know that
%% a write raced with a trim and lost (at least some of
%% the time inside the chain). But the transition that
%% we model in this case is a special w_ type that is
%% is trated specially by the dictionary-making
%% creation of the ValuesR relation.
{mod_working, w_special_trimmed, LPN, Pg, Pid};
({call, Pid, {fill, LPN, will_be, ok}}) ->
{mod_working, w_ft, LPN, fill, Pid};
({call, Pid, {trim, LPN, will_be, ok}}) ->
{mod_working, w_tt, LPN, trim, Pid};
({call, Pid, {read, LPN, will_fail, error_trimmed}}) ->
{mod_working, w_tt, LPN, read_repair_maybe, Pid}
end,
fun({mod_working, _Ttn, _LPN, _Pg, _Pid}, {result, _Pid, _Res})->
[]
end,
Events),
%% StartMod contains {mod_start, Ttn, LPN, V} when a modification finished.
%% DoneMod contains {mod_end, Ttn, LPN, V} when a modification finished.
%% This is a clever trick: Mods contains the start & end timestamp
%% for each modification. Use shift() by 1 usec to move all timestamps
%% forward/backward 1 usec, then subtract away the original time range to
%% leave a 1 usec relation in time.
StartMod = eqc_temporal:map(
fun({mod_working, Ttn, LPN, Pg, _Pid}) ->
{mod_start, Ttn, LPN, Pg}
end,
eqc_temporal:subtract(Mods, eqc_temporal:shift(1, Mods))),
DoneMod = eqc_temporal:map(
fun({mod_working, Ttn, LPN, Pg, _Pid}) ->
{mod_end, Ttn, LPN, Pg}
end,
eqc_temporal:subtract(eqc_temporal:shift(1, Mods), Mods)),
StartsDones = eqc_temporal:union(StartMod, DoneMod),
%% TODO: A brighter mind than mine might figure out how to do this
%% next step using only eqc_temporal.
%%
%% We create a new relation, ValuesR. This relation contains
%% {values, OD::orddict()} for each time interval in the relation.
%% The OD contains all possible values for a particular LPN at
%% that time in the relation.
%% The key for OD is LPN, the value is an unordered list of possible values.
InitialValDict = orddict:from_list([{LPN, [error_unwritten]} ||
LPN <- AllLPNs]),
ValuesRFun =
fun({TS1, TS2, StEnds}, Dict1) ->
Dict2 = lists:foldl(
fun({mod_start, w_1, LPN, Pg}, D) ->
orddict:append(LPN, Pg, D);
({mod_start, WType, LPN, _Pg}, D)
when WType == w_ft; WType == w_tt ->
case lists:member(error_trimmed,
orddict:fetch(LPN, D)) of
true ->
D;
false ->
orddict:append(LPN, error_trimmed,D)
end;
({mod_start, w_special_trimmed, LPN, Pg}, D)->
orddict:append(LPN, Pg, D)
end, Dict1, [X || X={mod_start,_,_,_} <- StEnds]),
Dict3 = lists:foldl(
fun({mod_end, w_1, LPN, Pg}, D) ->
Vs1 = orddict:fetch(LPN, D),
%% We've written a page. error_unwriten is
%% now impossible; any other binary() is
%% also impossible. However, there may be
%% a trim operation that's still in flight!
Vs2 = [V || V <- Vs1, V /= error_unwritten,
not is_binary(V)],
orddict:store(LPN, [Pg|Vs2], D);
({mod_end, WType, LPN, _Pg}, D)
when WType == w_ft; WType == w_tt ->
orddict:store(LPN, [error_trimmed], D);
({mod_end, w_special_trimmed, LPN, Pg}, D) ->
orddict:store(LPN, [Pg,error_trimmed], D)
end, Dict2, [X || X={mod_end,_,_,_} <- StEnds]),
{{TS1, TS2, [{values, Dict3}]}, Dict3}
end,
{ValuesR, _} = lists:mapfoldl(ValuesRFun, InitialValDict, StartsDones),
InitialTtnDict = orddict:from_list([{LPN, [w_0]} || LPN <- AllLPNs]),
{TransitionsR, _} =
lists:mapfoldl(
fun({TS1, TS2, StEnds}, Dict1) ->
Dict2 = lists:foldl(
fun({mod_end, Ttn, LPN, _Pg}, D) ->
%% orddict does not discard duplicates
orddict:append(LPN, Ttn, D);
(_, D) ->
D
end, Dict1, [X || X={mod_end,_,_,_} <- StEnds]),
{{TS1, TS2, [{transitions, Dict2}]}, Dict2}
end, InitialTtnDict, StartsDones),
%% Checking reads is a tricky thing. My first attempt created a temporal
%% relation for the 1usec window when the read call was complete, then
%% union with the ValuesR relation to see what values were valid at that
%% particular instant. That approach fails sometimes!
%%
%% The reason is honest race conditions with a mutation: the model doesn't
%% know exactly when the data was written, so a valid value may have been
%% added/removed from the ValuesR relation that aren't there for the
%% 1usec window that intersects with ValuesR.
%%
%% Instead, we need to merge together all possible values from ValuesR
%% that appear at any time during the read op's lifetime.
PerhapsR = eqc_temporal:stateful(
fun({call, _Pid, {goo_write, LPN, Pg}}) ->
{perhaps, LPN, Pg}
end,
fun(x)-> [] end,
Events),
{_, _, Perhaps} = lists:last(eqc_temporal:all_future(PerhapsR)),
%%?QC_FMT("*Perhaps: ~p\n", [Perhaps]),
Reads = eqc_temporal:stateful(
fun({call, Pid, {read, LPN, _, _}}) ->
{read, Pid, LPN, []}
end,
fun({read, Pid, LPN, V1s}, {values, Values}) ->
{ok, V2s} = orddict:find(LPN, Values),
NewVs = lists:umerge(lists:sort(V1s),
lists:sort(V2s)),
%% Throw an exception (which is equivalent to a no-op)
%% if there are no differences: if we make multiples
%% of the exact same thing, stateful() will get confused.
false = NewVs == V1s,
{read, Pid, LPN, NewVs};
({read, Pid, LPN, Vs}, {result, Pid, Pg}) ->
%% case lists:member(Pg, Vs) orelse
%% lists:member({perhaps, LPN, Pg}, Perhaps) of
case lists:member(Pg, Vs) of
true ->
[];
false ->
case lists:member({perhaps, LPN, Pg}, Perhaps) of
true ->
%% The checking of the Perhaps list in
%% this manner is not strictly
%% temporally valid. It is possible
%% for the {perhaps,...} event to be
%% after the event we're checking here.
%% TODO work is to make this check 100%
%% temporally valid.
io:format(user, "Yo, found ~p ~p in Perhaps\n", [LPN, Pg]),
[];
false ->
[{bad, read, LPN, Pid, got, Pg,
possible, Vs}]
end
end
end, eqc_temporal:union(Events, ValuesR)),
BadFilter = fun(bad) -> true;
(Bad) when is_tuple(Bad), element(1, Bad) == bad -> true;
(_) -> false end,
BadReads = filter_relation_facts(BadFilter, Reads),
%% Property: For all LPNs, the transition list for K must be one of the
%% following four (4) acceptable transition orderings.
{_, _, [{transitions, FinalTtns}]} = lists:last(
eqc_temporal:all_future(TransitionsR)),
FinaTtns_filtered = filter_transition_trimfill_suffixes(FinalTtns),
InvalidTransitions = orddict:fold(
fun(_LPN, [w_0], Acc) ->
Acc;
(_LPN, [w_0,w_1], Acc) ->
Acc;
(_LPN, [w_0,'w_t+'], Acc) ->
Acc;
(_LPN, [w_0,w_1,'w_t+'], Acc) ->
Acc;
(LPN, BadTtns, Acc) ->
[{LPN, BadTtns}|Acc]
end, [], FinaTtns_filtered),
?WHENFAIL(begin
?QC_FMT("*Trace: ~p\n", [Trace]),
?QC_FMT("*ModsReads: ~p\n", [eqc_temporal:unions([Mods,Reads])]),
?QC_FMT("*InvalidTtns: ~p\n", [InvalidTransitions]),
?QC_FMT("*ValuesR: ~p\n", [eqc_temporal:unions([ValuesR, StartsDones])]),
?QC_FMT("*Calls: ~p\n", [Calls]),
?QC_FMT("*BadReads: ~p\n", [BadReads]),
?QC_FMT("*Perhaps: ~p\n", [Perhaps])
end,
conjunction(
[
{all_calls_finish,
eqc_temporal:is_false(eqc_temporal:all_future(Calls))},
{no_invalidTransitions,
InvalidTransitions == []},
{no_bad_reads,
eqc_temporal:is_false(eqc_temporal:all_future(BadReads))},
%% If you want to see PULSE causing crazy scheduling, then
%% change one of the "true orelse" -> "false orelse" below.
%% {bogus_no_gaps,
%% true orelse
%% (AppendLPNs == [] orelse length(range_ify(AppendLPNs)) == 1)},
%% {bogus_exactly_1_to_N,
%% true orelse (AppendLPNs == lists:seq(1, length(AppendLPNs)))},
{true, true}
])).
add_LPN_to_append_calls([{TS, {call, Pid, {append, Page}}}|Rest]) ->
Res = trace_lookahead_pid(Pid, Rest),
New = case Res of
{ok, LPN} ->
{TS, {call, Pid, {append, Page, will_be, LPN}}};
Else ->
{TS, {call, Pid, {append, Page, will_fail, Else}}}
end,
[New|add_LPN_to_append_calls(Rest)];
add_LPN_to_append_calls([{TS, {call, Pid, {OpName, LPN}}}|Rest])
when OpName == fill; OpName == trim ->
Res = trace_lookahead_pid(Pid, Rest),
New = case Res of
ok ->
{TS, {call, Pid, {OpName, LPN, will_be, ok}}};
Else ->
{TS, {call, Pid, {OpName, LPN, will_fail, Else}}}
end,
[New|add_LPN_to_append_calls(Rest)];
add_LPN_to_append_calls([{TS, {call, Pid, {read, LPN}}}|Rest]) ->
Res = trace_lookahead_pid(Pid, Rest),
New = case Res of
Page when is_binary(Page) ->
{TS, {call, Pid, {read, LPN, will_be, Page}}};
Else ->
{TS, {call, Pid, {read, LPN, will_fail, Else}}}
end,
[New|add_LPN_to_append_calls(Rest)];
add_LPN_to_append_calls([X|Rest]) ->
[X|add_LPN_to_append_calls(Rest)];
add_LPN_to_append_calls([]) ->
[].
trace_lookahead_pid(Pid, [{_TS, {result, Pid, Res}}|_]) ->
Res;
trace_lookahead_pid(Pid, [_H|T]) ->
trace_lookahead_pid(Pid, T).
%% Presenting command data statistics in a nicer way
command_data({set, _, {call, _, Fun, _}}, {_S, _V}) ->
Fun.
%% Convenience functions for running tests
test() ->
test({20, sec}).
test(N) when is_integer(N) ->
quickcheck(numtests(N, prop_pulse()));
test({Time, sec}) ->
quickcheck(eqc:testing_time(Time, prop_pulse()));
test({Time, min}) ->
test({Time * 60, sec});
test({Time, h}) ->
test({Time * 60, min}).
check() ->
check(current_counterexample()).
verbose() ->
verbose(current_counterexample()).
verbose(CE) ->
erlang:put(verbose, true),
Ok = check(CE),
erlang:put(verbose, false),
Ok.
check(CE) ->
check(on_output(fun("OK" ++ _, []) -> ok; (Fmt, Args) -> io:format(Fmt, Args) end,
prop_pulse(true == erlang:get(verbose))),
CE).
recheck() ->
recheck(prop_pulse()).
zipwith(F, [X|Xs], [Y|Ys]) ->
[F(X, Y)|zipwith(F, Xs, Ys)];
zipwith(_, _, _) -> [].
delete_dir(Dir) ->
corfurl_util:delete_dir(Dir).
clean_up_runtime(#run{flus=Flus, proj=P}) ->
%% io:format(user, "clean_up_runtime: run = ~p\n", [R]),
#proj{seq={Seq,_,_}} = P,
catch corfurl_sequencer:stop(Seq),
[catch corfurl_flu:stop(F) || F <- Flus],
corfurl_test:setup_del_all(length(Flus)),
delete_dir(?PROJECTION_DIR),
(catch exit(whereis(?SEQUENCER_NAME), kill)).
make_chains(ChainLen, FLUs) ->
make_chains(ChainLen, FLUs, [], []).
make_chains(_ChainLen, [], SmallAcc, BigAcc) ->
[lists:reverse(SmallAcc)|BigAcc];
make_chains(ChainLen, [H|T], SmallAcc, BigAcc) ->
if length(SmallAcc) == ChainLen ->
make_chains(ChainLen, T, [H], [lists:reverse(SmallAcc)|BigAcc]);
true ->
make_chains(ChainLen, T, [H|SmallAcc], BigAcc)
end.
setup(NumChains, ChainLen, PageSize, SeqType) ->
(catch exit(whereis(?SEQUENCER_NAME), kill)),
lamport_clock:init(),
N = NumChains * ChainLen,
FLUs = corfurl_test:setup_basic_flus(N, PageSize, ?MAX_PAGES),
{ok, Seq} = corfurl_sequencer:start_link(FLUs, SeqType),
Chains = make_chains(ChainLen, FLUs),
%% io:format(user, "Cs = ~p\n", [Chains]),
Proj = corfurl:new_simple_projection(?PROJECTION_DIR,
1, 1, ?MAX_PAGES, Chains),
ok = corfurl:save_projection(?PROJECTION_DIR, Proj),
error_overwritten = corfurl:save_projection(?PROJECTION_DIR, Proj),
1 = corfurl:latest_projection_epoch_number(?PROJECTION_DIR),
{ok, Proj} = corfurl:read_projection(?PROJECTION_DIR, 1),
Run = #run{proj=Proj#proj{seq={Seq, node(), ?SEQUENCER_NAME}},
flus=FLUs},
ets:insert(?MY_TAB, {?MY_KEY, Run}),
Run.
range_ify([]) ->
[];
range_ify(L) ->
[H|T] = lists:sort(L),
range_ify(H, H+1, T).
range_ify(Beginning, Next, [Next|T]) ->
range_ify(Beginning, Next+1, T);
range_ify(Beginning, Next, [Else|T]) ->
[{Beginning, to, Next-1}|range_ify(Else, Else+1, T)];
range_ify(Beginning, Next, []) ->
[{Beginning, to, Next-1}].
filter_relation_facts(FilterFun, R) ->
[{TS1, TS2, lists:filter(FilterFun, Facts)} || {TS1, TS2, Facts} <- R].
%% {TS1, TS2, Facts} <- Reads, Fact <- Facts, BadFilter(Fact)],
filter_transition_trimfill_suffixes(Ttns) ->
[{X, filter_1_transition_list(L)} || {X, L} <- Ttns].
filter_1_transition_list([]) ->
[];
filter_1_transition_list(Old) ->
%% Strategy: Chop off all of the w_* at the end, then look at **Old** to
%% see if we chopped off any. If we did chop off any, then add back a
%% constant 'w_t+' as a suffix.
New = lists:reverse(lists:dropwhile(fun(w_tt) -> true;
(w_ft) -> true;
(w_special_trimmed) -> true;
(_) -> false
end, lists:reverse(Old))),
Suffix = case lists:last(Old) of
w_ft -> ['w_t+'];
w_tt -> ['w_t+'];
w_special_trimmed -> ['w_t+'];
_ -> []
end,
New ++ Suffix.
log_make_call(Tag) ->
log_make_call(self(), Tag).
log_make_call(Pid, Tag) ->
{call, Pid, Tag}.
log_make_result(Result) ->
log_make_result(self(), Result).
log_make_result(Pid, Result) ->
{result, Pid, Result}.
pick_an_LPN(#proj{seq={Seq,_,_}} = P, SeedInt) ->
case (catch corfurl_sequencer:get(Seq, 0)) of
{ok, Max} ->
%% The sequencer may be lying to us, shouganai.
if SeedInt > Max -> (SeedInt rem Max) + 1;
true -> SeedInt
end;
_Else ->
pick_an_LPN(corfurl_client:restart_sequencer(P), SeedInt)
end.
-define(LOG3(Tag, MkCall, PostCall),
begin
LOG__Start = lamport_clock:get(),
event_logger:event(log_make_call(Tag), LOG__Start),
LOG__Result = MkCall,
LOG__End = lamport_clock:get(),
PostCall,
event_logger:event(log_make_result(LOG__Result), LOG__End),
LOG__Result
end).
-define(LOG(Tag, MkCall), ?LOG3(Tag, MkCall, okqq)).
append(#run{proj=OriginalProj}, Page) ->
lamport_clock:init(),
lamport_clock:incr(),
Proj = get_projection(OriginalProj),
?LOG3({append, Page},
try
corfurl_client:pulse_tracing_start(write),
{Res, Proj2} = corfurl_client:append_page(Proj, Page),
put_projection(Proj2),
OtherPages0 = lists:usort(corfurl_client:pulse_tracing_get(write)),
OtherPages = case Res of
{ok, LPN} ->
OtherPages0 -- [LPN];
_ ->
OtherPages0
end,
put(zzzOtherPages, OtherPages),
perhaps_trip_append_page(?TRIP_no_append_duplicates, Res, Page)
catch X:Y ->
{caught, ?MODULE, ?LINE, X, Y, erlang:get_stacktrace()}
end,
try
OPages = get(zzzOtherPages),
%%if OPages /= [] -> io:format("OPages = ~w\n", [OPages]); true -> ok end,
GooPid = {self(), goo, now()},
[begin
event_logger:event(log_make_call(GooPid, {goo_write, OP, Page}),
LOG__Start),
event_logger:event(log_make_result(GooPid, who_knows),
LOG__End)
end || OP <- OPages]
catch XX:YY ->
exit({oops, ?MODULE, ?LINE, XX, YY, erlang:get_stacktrace()})
end).
read_result_mangle({ok, Page}) ->
Page;
read_result_mangle(Else) ->
Else.
read_approx(#run{proj=OriginalProj}, SeedInt) ->
lamport_clock:init(),
lamport_clock:incr(),
Proj = get_projection(OriginalProj),
LPN = pick_an_LPN(Proj, SeedInt),
?LOG({read, LPN},
try
{Res, Proj2} = corfurl_client:read_page(Proj, LPN),
put_projection(Proj2),
Res2 = read_result_mangle(Res),
perhaps_trip_read_approx(?TRIP_bad_read, Res2, LPN)
catch X:Y ->
{caught, ?MODULE, ?LINE, X, Y, erlang:get_stacktrace()}
end).
scan_forward(#run{proj=OriginalProj}, SeedInt, NumPages) ->
lamport_clock:init(),
lamport_clock:incr(),
Proj = get_projection(OriginalProj),
StartLPN = if SeedInt == 1 -> 1;
true -> pick_an_LPN(Proj, SeedInt)
end,
%% Our job is complicated by the ?LOG() macro, which isn't good enough
%% for our purpose: we must lie about the starting timestamp, to make
%% it appear as if each LPN result that scan_forward() gives us came
%% instead from a single-page read_page() call.
?LOG({scan_forward, StartLPN, NumPages},
try
TS1 = lamport_clock:get(),
case corfurl_client:scan_forward(Proj, StartLPN, NumPages) of
{{Res, EndLPN, MoreP, Pages}, Proj2}
when Res == ok; Res == error_badepoch ->
put_projection(Proj2),
PageIs = lists:zip(Pages, lists:seq(1, length(Pages))),
TS2 = lamport_clock:get(),
[begin
PidI = {self(), s_f, I},
event_logger:event(log_make_call(PidI, {read, LPN}),
TS1),
Pm = perhaps_trip_scan_forward(
?TRIP_bad_scan_forward, read_result_mangle(P),
EndLPN),
event_logger:event(log_make_result(PidI, Pm), TS2)
end || {{LPN, P}, I} <- PageIs],
Ps = [{LPN, read_result_mangle(P)} ||
{LPN, P} <- Pages],
{ok, EndLPN, MoreP, Ps}
end
catch X:Y ->
{caught, ?MODULE, ?LINE, X, Y, erlang:get_stacktrace()}
end).
fill(#run{proj=OriginalProj}, SeedInt) ->
lamport_clock:init(),
lamport_clock:incr(),
Proj = get_projection(OriginalProj),
LPN = pick_an_LPN(Proj, SeedInt),
?LOG({fill, LPN},
try
{Res, Proj2} = corfurl_client:fill_page(Proj, LPN),
put_projection(Proj2),
perhaps_trip_fill_page(?TRIP_bad_fill, Res, LPN)
catch X:Y ->
{caught, ?MODULE, ?LINE, X, Y, erlang:get_stacktrace()}
end).
trim(#run{proj=OriginalProj}, SeedInt) ->
lamport_clock:init(),
lamport_clock:incr(),
Proj = get_projection(OriginalProj),
LPN = pick_an_LPN(Proj, SeedInt),
?LOG({trim, LPN},
try
{Res, Proj2} = corfurl_client:trim_page(Proj, LPN),
put_projection(Proj2),
perhaps_trip_trim_page(?TRIP_bad_trim, Res, LPN)
catch X:Y ->
{caught, ?MODULE, ?LINE, X, Y, erlang:get_stacktrace()}
end).
stop_sequencer(#run{proj=OriginalProj}, Method) ->
Proj = get_projection(OriginalProj),
Seq = element(1,Proj#proj.seq),
try
corfurl_sequencer:stop(Seq, Method),
ok
catch _:_ ->
ok
end.
get_projection(OriginalProj) ->
case get(projection) of
undefined ->
OriginalProj;
Proj ->
Proj
end.
put_projection(Proj) ->
put(projection, Proj).
perhaps_trip_append_page(false, Res, _Page) ->
Res;
perhaps_trip_append_page(true, {ok, LPN}, _Page) when LPN > 3 ->
io:format(user, "TRIP: append_page\n", []),
{ok, 3};
perhaps_trip_append_page(true, Else, _Page) ->
Else.
perhaps_trip_read_approx(false, Res, _LPN) ->
Res;
perhaps_trip_read_approx(true, _Res, 3 = LPN) ->
io:format(user, "TRIP: read_approx LPN ~p\n", [LPN]),
<<"FAKE!">>;
perhaps_trip_read_approx(true, Res, _LPN) ->
Res.
perhaps_trip_scan_forward(false, Res, _EndLPN) ->
Res;
perhaps_trip_scan_forward(true, _Res, 10) ->
io:format(user, "TRIP: scan_forward\n", []),
<<"magic number bingo, you are a winner">>;
perhaps_trip_scan_forward(true, Res, _EndLPN) ->
Res.
perhaps_trip_fill_page(false, Res, _EndLPN) ->
Res;
perhaps_trip_fill_page(true, _Res, LPN) when 3 =< LPN, LPN =< 5 ->
io:format(user, "TRIP: fill_page\n", []),
ok; % can trigger both invalid ttn and bad read
perhaps_trip_fill_page(true, Res, _EndLPN) ->
Res.
perhaps_trip_trim_page(false, Res, _EndLPN) ->
Res;
perhaps_trip_trim_page(true, _Res, LPN) when 3 =< LPN, LPN =< 5 ->
io:format(user, "TRIP: trim_page\n", []),
ok;
perhaps_trip_trim_page(true, Res, _EndLPN) ->
Res.
-endif. % PULSE
-endif. % TEST

View file

@ -1,98 +0,0 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(corfurl_sequencer_test).
-compile(export_all).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-ifdef(PULSE).
-compile({parse_transform, pulse_instrument}).
-endif.
-endif.
-define(M, corfurl_sequencer).
-ifdef(TEST).
-ifndef(PULSE).
smoke_test() ->
BaseDir = "/tmp/" ++ atom_to_list(?MODULE) ++ ".",
PageSize = 8,
NumPages = 500,
NumFLUs = 4,
MyDir = fun(X) -> BaseDir ++ integer_to_list(X) end,
Del = fun() -> [ok = corfurl_util:delete_dir(MyDir(X)) ||
X <- lists:seq(1, NumFLUs)] end,
Del(),
FLUs = [begin
element(2, corfurl_flu:start_link(MyDir(X),
PageSize, NumPages*PageSize))
end || X <- lists:seq(1, NumFLUs)],
FLUsNums = lists:zip(FLUs, lists:seq(1, NumFLUs)),
try
[ok = corfurl_flu:write(FLU, 1, PageNum, <<42:(8*8)>>) ||
{FLU, PageNum} <- FLUsNums],
MLP0 = NumFLUs,
NumFLUs = ?M:get_max_logical_page(FLUs),
{ok, Sequencer} = ?M:start_link(FLUs),
try
{ok, _} = ?M:get(Sequencer, 5000),
[{Stream9, Tails9}] = StreamTails = [{9, [1125, 1124, 1123]}],
ok = ?M:set_tails(Sequencer, StreamTails),
{ok, _, [Tails9]} = ?M:get_tails(Sequencer, 0, [Stream9]),
{ok, LPN0a} = ?M:get(Sequencer, 2),
{ok, LPN0b} = ?M:get(Sequencer, 0),
LPN0a = LPN0b - 2,
{ok, LPN2a, _} = ?M:get_tails(Sequencer, 1, [2]),
{ok, LPN1a, _} = ?M:get_tails(Sequencer, 1, [1]),
{ok, _, [[LPN1a], [LPN2a]]} = ?M:get_tails(Sequencer,
0, [1,2]),
{ok, LPN2b, _} = ?M:get_tails(Sequencer, 1, [2]),
{ok, LPN2c, _} = ?M:get_tails(Sequencer, 1, [2]),
{ok, _, [[LPN1a], [LPN2c, LPN2b, LPN2a]]} =
?M:get_tails(Sequencer, 0, [1,2]),
{ok, LPN2d, _} = ?M:get_tails(Sequencer, 1, [2]),
{ok, LPN2e, _} = ?M:get_tails(Sequencer, 1, [2]),
{ok, LPNX, [[LP1a], [LPN2e, LPN2d, LPN2c, LPN2b]]} =
?M:get_tails(Sequencer, 0, [1,2]),
{ok, LPNX, [[LP1a], [LPN2e, LPN2d, LPN2c, LPN2b]]} =
?M:get_tails(Sequencer, 0, [1,2]), % same results
LPNX = LPN2e + 1, % no change with 0 request
ok
after
?M:stop(Sequencer)
end
after
[ok = corfurl_flu:stop(FLU) || FLU <- FLUs],
Del()
end.
-endif. % not PULSE
-endif. % TEST

View file

@ -1,262 +0,0 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(corfurl_test).
-include("corfurl.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-define(M, corfurl).
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
setup_flu_basedir() ->
"./tmp." ++
atom_to_list(?MODULE) ++ "." ++ os:getpid() ++ ".".
setup_flu_dir(N) ->
setup_flu_basedir() ++ integer_to_list(N).
setup_del_all(NumFLUs) ->
[ok = corfurl_util:delete_dir(setup_flu_dir(N)) ||
N <- lists:seq(1, NumFLUs)].
setup_basic_flus(NumFLUs, PageSize, NumPages) ->
setup_del_all(NumFLUs),
[begin
element(2, corfurl_flu:start_link(setup_flu_dir(X),
PageSize, NumPages * (PageSize * ?PAGE_OVERHEAD)))
end || X <- lists:seq(1, NumFLUs)].
-ifndef(PULSE).
save_read_test() ->
Dir = "/tmp/" ++ atom_to_list(?MODULE) ++".save-read",
PDir = Dir ++ ".projection",
Chain = [a,b],
P1 = ?M:new_simple_projection(PDir, 1, 1, 1*100, [Chain]),
try
filelib:ensure_dir(Dir ++ "/ignored"),
ok = ?M:save_projection(Dir, P1),
error_overwritten = ?M:save_projection(Dir, P1),
{ok, P1} = ?M:read_projection(Dir, 1),
error_unwritten = ?M:read_projection(Dir, 2),
ok
after
ok = corfurl_util:delete_dir(Dir),
ok = corfurl_util:delete_dir(PDir)
end.
smoke1_test() ->
PDir = "./tmp.smoke1.projection",
NumFLUs = 6,
PageSize = 8,
NumPages = 10,
FLUs = [F1, F2, F3, F4, F5, F6] =
setup_basic_flus(NumFLUs, PageSize, NumPages),
{ok, Seq} = corfurl_sequencer:start_link(FLUs),
%% We know that the first LPN will be 1.
LPN_Pgs = [{X, list_to_binary(
lists:flatten(io_lib:format("~8..0w", [X])))} ||
X <- lists:seq(1, 5)],
try
P0 = ?M:new_simple_projection(PDir, 1, 1, 1*100,
[[F1, F2, F3], [F4, F5, F6]]),
P1 = P0#proj{seq={Seq, unused, unused}},
[begin {{ok, LPN}, _} = corfurl_client:append_page(P1, Pg) end || {LPN, Pg} <- LPN_Pgs],
[begin {ok, Pg} = ?M:read_page(P1, LPN) end || {LPN, Pg} <- LPN_Pgs],
[begin
LPNplus = LPN + 1,
{ok, LPNplus, true, [{LPN, Pg}]} = ?M:scan_forward(P1, LPN, 1)
end || {LPN, Pg} <- LPN_Pgs],
{ok, 6, false, []} = ?M:scan_forward(P1, 6, 1),
{ok, 6, false, []} = ?M:scan_forward(P1, 6, 10),
[{LPN1,Pg1}, {LPN2,Pg2}, {LPN3,Pg3}, {LPN4,Pg4}, {LPN5,Pg5}] = LPN_Pgs,
{ok, 4, true, [{LPN2,Pg2}, {LPN3,Pg3}]} = ?M:scan_forward(P1, 2, 2),
{ok, 6, false, [{LPN3,Pg3}, {LPN4,Pg4}, {LPN5,Pg5}]} =
?M:scan_forward(P1, 3, 10),
%% Let's smoke read-repair: regular write failure
Epoch = P1#proj.epoch,
Pg6 = <<424242:(PageSize*8)>>,
%% Simulate a failed write to the chain.
[F6a, F6b, F6c] = Chain6 = ?M:project_to_chain(6, P1),
NotHead6 = [F6b, F6c],
ok = ?M:write_page_to_chain([F6a], [F6a], Epoch, 6, Pg6, 1),
%% Does the chain look as expected?
{ok, Pg6} = corfurl_flu:read(?M:flu_pid(F6a), Epoch, 6),
[error_unwritten = corfurl_flu:read(?M:flu_pid(X), Epoch, 6) ||
X <- NotHead6],
%% Read repair should fix it.
{ok, Pg6} = ?M:read_page(P1, 6),
[{ok, Pg6} = corfurl_flu:read(?M:flu_pid(X), Epoch, 6) || X <- Chain6],
%% Let's smoke read-repair: failed fill
[F7a, F7b, F7c] = Chain7 = ?M:project_to_chain(7, P1),
NotHead7 = [F7b, F7c],
ok = corfurl_flu:fill(?M:flu_pid(F7a), Epoch, 7),
%% Does the chain look as expected?
error_trimmed = corfurl_flu:read(?M:flu_pid(F7a), Epoch, 7),
[error_unwritten = corfurl_flu:read(?M:flu_pid(X), Epoch, 7) ||
X <- NotHead7],
%% Read repair should fix it.
error_trimmed = ?M:read_page(P1, 7),
[error_trimmed = corfurl_flu:read(?M:flu_pid(X), Epoch, 7) || X <- Chain7],
%% scan_forward shouldn't see it either
{ok, 8, false, [{6,Pg6}]} = ?M:scan_forward(P1, 6, 10),
[F8a|_] = Chain8 = ?M:project_to_chain(8, P1),
ok = corfurl_flu:fill(?M:flu_pid(F8a), Epoch, 8),
%% No read before scan, scan_forward shouldn't see 8 either,
%% but the next seq should be 9
{ok, 9, false, [{6,Pg6}]} = ?M:scan_forward(P1, 6, 10),
ok
after
corfurl_util:delete_dir(PDir),
corfurl_sequencer:stop(Seq),
[corfurl_flu:stop(F) || F <- FLUs],
setup_del_all(NumFLUs)
end.
smoke_append_badepoch_test() ->
PDir = "./tmp.smoke2.projection",
NumFLUs = 6,
PageSize = 8,
NumPages = 10,
FLUs = [F1, F2, F3, F4, F5, F6] =
setup_basic_flus(NumFLUs, PageSize, NumPages),
{ok, Seq} = corfurl_sequencer:start_link(FLUs),
%% We know that the first LPN will be 1.
LPN_Pgs = [{X, list_to_binary(
lists:flatten(io_lib:format("~8..0w", [X])))} ||
X <- lists:seq(1, 5)],
try
LittleEpoch = 4,
BigEpoch = 42,
P0 = ?M:new_simple_projection(PDir, BigEpoch, 1, 1*100,
[[F1, F2, F3], [F4, F5, F6]]),
P1 = P0#proj{seq={Seq, unused, unused}},
[begin {{ok, LPN}, _} = corfurl_client:append_page(P1, Pg) end || {LPN, Pg} <- LPN_Pgs],
[{ok, _} = corfurl_flu:seal(FLU, BigEpoch) || FLU <- FLUs],
{_LPN, Pg} = hd(LPN_Pgs),
{error_badepoch, _} = corfurl_client:append_page(P1, Pg),
P2 = P1#proj{epoch=LittleEpoch},
{error_badepoch, _} = corfurl_client:append_page(P2, Pg),
ok
after
corfurl_util:delete_dir(PDir),
corfurl_sequencer:stop(Seq),
[corfurl_flu:stop(F) || F <- FLUs],
setup_del_all(NumFLUs)
end.
-ifdef(TIMING_TEST).
forfun_test_() ->
{timeout, 99999, fun() ->
[forfun(Procs) || Procs <- [10,100,1000,5000]]
end}.
forfun_append(0, _P, _Page) ->
ok;
forfun_append(N, #proj{seq={Seq, _, _}} = P, Page) ->
{ok, _} = ?M:append_page(Seq, P, Page),
forfun_append(N - 1, P, Page).
%%% My MBP, SSD
%%% The 1K and 5K procs shows full-mailbox-scan ickiness
%%% when getting replies from prim_file. :-(
%%% forfun: 10 procs writing 200000 pages of 8 bytes/page to 2 chains of 4 total FLUs in 10.016815 sec
%%% forfun: 100 procs writing 200000 pages of 8 bytes/page to 2 chains of 4 total FLUs in 10.547976 sec
%%% forfun: 1000 procs writing 200000 pages of 8 bytes/page to 2 chains of 4 total FLUs in 13.706686 sec
%%% forfun: 5000 procs writing 200000 pages of 8 bytes/page to 2 chains of 4 total FLUs in 33.516312 sec
%%% forfun: 10 procs writing 200000 pages of 8 bytes/page to 4 chains of 4 total FLUs in 5.350147 sec
%%% forfun: 100 procs writing 200000 pages of 8 bytes/page to 4 chains of 4 total FLUs in 5.429485 sec
%%% forfun: 1000 procs writing 200000 pages of 8 bytes/page to 4 chains of 4 total FLUs in 5.643233 sec
%%% forfun: 5000 procs writing 200000 pages of 8 bytes/page to 4 chains of 4 total FLUs in 15.686058 sec
%%%% forfun: 10 procs writing 200000 pages of 4096 bytes/page to 2 chains of 4 total FLUs in 13.479458 sec
%%%% forfun: 100 procs writing 200000 pages of 4096 bytes/page to 2 chains of 4 total FLUs in 14.752565 sec
%%%% forfun: 1000 procs writing 200000 pages of 4096 bytes/page to 2 chains of 4 total FLUs in 25.012306 sec
%%%% forfun: 5000 procs writing 200000 pages of 4096 bytes/page to 2 chains of 4 total FLUs in 38.972076 sec
forfun(NumProcs) ->
PDir = "./tmp.forfun.projection",
io:format(user, "\n", []),
NumFLUs = 4,
PageSize = 8,
%%PageSize = 4096,
NumPages = 200*1000,
PagesPerProc = NumPages div NumProcs,
FLUs = [F1, F2, F3, F4] = setup_basic_flus(NumFLUs, PageSize, NumPages),
{ok, Seq} = corfurl_sequencer:start_link(FLUs),
try
Chains = [[F1, F2], [F3, F4]],
%%Chains = [[F1], [F2], [F3], [F4]],
P0 = ?M:new_simple_projection(PDir, 1, 1, NumPages*2, Chains),
P = P0#proj{seq={Seq, unused, unused}},
Me = self(),
Start = now(),
Ws = [begin
Page = <<X:(PageSize*8)>>,
spawn_link(fun() ->
forfun_append(PagesPerProc, P, Page),
Me ! {done, self()}
end)
end || X <- lists:seq(1, NumProcs)],
[receive {done, W} -> ok end || W <- Ws],
End = now(),
io:format(user, "forfun: ~p procs writing ~p pages of ~p bytes/page to ~p chains of ~p total FLUs in ~p sec\n",
[NumProcs, NumPages, PageSize, length(Chains), length(lists:flatten(Chains)), timer:now_diff(End, Start) / 1000000]),
ok
after
corfur_util:delete_dir(PDir),
corfurl_sequencer:stop(Seq),
[corfurl_flu:stop(F) || F <- FLUs],
setup_del_all(NumFLUs)
end.
-endif. % TIMING_TEST
-endif. % not PULSE
-endif. % TEST

View file

@ -1,133 +0,0 @@
%%% File : handle_errors.erl
%%% Author : Ulf Norell
%%% Description :
%%% Created : 26 Mar 2012 by Ulf Norell
-module(event_logger).
-compile(export_all).
-behaviour(gen_server).
%% API
-export([start_link/0, event/1, event/2, get_events/0, start_logging/0]).
-export([timestamp/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, { start_time, events = [] }).
-record(event, { timestamp, data }).
%%====================================================================
%% API
%%====================================================================
%%--------------------------------------------------------------------
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
%% Description: Starts the server
%%--------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
start_logging() ->
gen_server:call(?MODULE, {start, timestamp()}).
event(EventData) ->
event(EventData, timestamp()).
event(EventData, Timestamp) ->
gen_server:call(?MODULE,
#event{ timestamp = Timestamp, data = EventData }).
async_event(EventData) ->
gen_server:cast(?MODULE,
#event{ timestamp = timestamp(), data = EventData }).
get_events() ->
gen_server:call(?MODULE, get_events).
%%====================================================================
%% gen_server callbacks
%%====================================================================
%%--------------------------------------------------------------------
%% Function: init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% Description: Initiates the server
%%--------------------------------------------------------------------
init([]) ->
{ok, #state{}}.
%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) ->
%% {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
%% Description: Handling call messages
%%--------------------------------------------------------------------
handle_call(Event = #event{}, _From, State) ->
{reply, ok, add_event(Event, State)};
handle_call({start, Now}, _From, S) ->
{reply, ok, S#state{ events = [], start_time = Now }};
handle_call(get_events, _From, S) ->
{reply, lists:reverse([ {E#event.timestamp, E#event.data} || E <- S#state.events]),
S#state{ events = [] }};
handle_call(Request, _From, State) ->
{reply, {error, {bad_call, Request}}, State}.
%%--------------------------------------------------------------------
%% Function: handle_cast(Msg, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling cast messages
%%--------------------------------------------------------------------
handle_cast(Event = #event{}, State) ->
{noreply, add_event(Event, State)};
handle_cast(_Msg, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling all non call/cast messages
%%--------------------------------------------------------------------
handle_info(_Info, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: terminate(Reason, State) -> void()
%% Description: This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any necessary
%% cleaning up. When it returns, the gen_server terminates with Reason.
%% The return value is ignored.
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
%% Description: Convert process state when code is changed
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
add_event(#event{timestamp = Now, data = Data}, State) ->
Event = #event{ timestamp = Now, data = Data },
State#state{ events = [Event|State#state.events] }.
timestamp() ->
lamport_clock:get().

View file

@ -1,153 +0,0 @@
%%%-------------------------------------------------------------------
%%% @author Hans Svensson <>
%%% @copyright (C) 2012, Hans Svensson
%%% @doc
%%%
%%% @end
%%% Created : 19 Mar 2012 by Hans Svensson <>
%%%-------------------------------------------------------------------
-module(handle_errors).
-behaviour(gen_event).
%% API
-export([start_link/0, add_handler/0]).
%% gen_event callbacks
-export([init/1, handle_event/2, handle_call/2,
handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, { errors = [] }).
%%%===================================================================
%%% gen_event callbacks
%%%===================================================================
%%--------------------------------------------------------------------
%% @doc
%% Creates an event manager
%%
%% @spec start_link() -> {ok, Pid} | {error, Error}
%% @end
%%--------------------------------------------------------------------
start_link() ->
gen_event:start_link({local, ?SERVER}).
%%--------------------------------------------------------------------
%% @doc
%% Adds an event handler
%%
%% @spec add_handler() -> ok | {'EXIT', Reason} | term()
%% @end
%%--------------------------------------------------------------------
add_handler() ->
gen_event:add_handler(?SERVER, ?MODULE, []).
%%%===================================================================
%%% gen_event callbacks
%%%===================================================================
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever a new event handler is added to an event manager,
%% this function is called to initialize the event handler.
%%
%% @spec init(Args) -> {ok, State}
%% @end
%%--------------------------------------------------------------------
init([]) ->
{ok, #state{}}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever an event manager receives an event sent using
%% gen_event:notify/2 or gen_event:sync_notify/2, this function is
%% called for each installed event handler to handle the event.
%%
%% @spec handle_event(Event, State) ->
%% {ok, State} |
%% {swap_handler, Args1, State1, Mod2, Args2} |
%% remove_handler
%% @end
%%--------------------------------------------------------------------
handle_event({error, _, {_, "Hintfile '~s' has bad CRC" ++ _, _}}, State) ->
{ok, State};
handle_event({error, _, {_, "** Generic server" ++ _, _}}, State) ->
{ok, State};
handle_event({error, _, {_, "Failed to merge ~p: ~p\n", [_, not_ready]}}, State) ->
{ok, State};
handle_event({error, _, {_, "Failed to merge ~p: ~p\n", [_, {merge_locked, _, _}]}}, State) ->
{ok, State};
handle_event({error, _, {_, "Failed to read lock data from ~s: ~p\n", [_, {invalid_data, <<>>}]}}, State) ->
{ok, State};
handle_event({error, _, Event}, State) ->
{ok, State#state{ errors = [Event|State#state.errors] }};
handle_event(_Event, State) ->
{ok, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever an event manager receives a request sent using
%% gen_event:call/3,4, this function is called for the specified
%% event handler to handle the request.
%%
%% @spec handle_call(Request, State) ->
%% {ok, Reply, State} |
%% {swap_handler, Reply, Args1, State1, Mod2, Args2} |
%% {remove_handler, Reply}
%% @end
%%--------------------------------------------------------------------
handle_call(get_errors, S) ->
{ok, S#state.errors, S#state{ errors = [] }};
handle_call(_Request, State) ->
Reply = ok,
{ok, Reply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called for each installed event handler when
%% an event manager receives any other message than an event or a
%% synchronous request (or a system message).
%%
%% @spec handle_info(Info, State) ->
%% {ok, State} |
%% {swap_handler, Args1, State1, Mod2, Args2} |
%% remove_handler
%% @end
%%--------------------------------------------------------------------
handle_info(_Info, State) ->
{ok, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever an event handler is deleted from an event manager, this
%% function is called. It should be the opposite of Module:init/1 and
%% do any necessary cleaning up.
%%
%% @spec terminate(Reason, State) -> void()
%% @end
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Convert process state when code is changed
%%
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
%% @end
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================

View file

@ -1,48 +0,0 @@
-module(lamport_clock).
-export([init/0, get/0, update/1, incr/0]).
-define(KEY, ?MODULE).
-ifdef(TEST).
init() ->
case get(?KEY) of
undefined ->
%% {Ca, Cb, _} = now(),
%% FakeTOD = ((Ca * 1000000) + Cb) * 1000000,
FakeTOD = 0,
put(?KEY, FakeTOD + 1);
N when is_integer(N) ->
ok
end.
get() ->
get(?KEY).
update(Remote) ->
New = erlang:max(get(?KEY), Remote) + 1,
put(?KEY, New),
New.
incr() ->
New = get(?KEY) + 1,
put(?KEY, New),
New.
-else. % TEST
init() ->
ok.
get() ->
ok.
update(_) ->
ok.
incr() ->
ok.
-endif. % TEST

View file

@ -23,9 +23,6 @@
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-ifdef(PULSE).
-compile({parse_transform, pulse_instrument}).
-endif.
-endif.
-define(D(X), io:format(user, "Dbg: ~s = ~p\n", [??X, X])).

View file

@ -27,15 +27,13 @@
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-ifdef(PULSE).
-compile({parse_transform, pulse_instrument}).
-endif.
-endif.
-define(SEQ, corfurl_sequencer).
-define(T, tango).
-define(D(X), io:format(user, "Dbg: ~s =\n ~p\n", [??X, X])).
-define(D(X), ok).
%% -define(D(X), io:format(user, "Dbg: ~s =\n ~p\n", [??X, X])).
-ifdef(TEST).
-ifndef(PULSE).