Compare commits

...

227 commits

Author SHA1 Message Date
Gregory Burd
cc807a97d6 Add new mmap config option. 2014-01-15 10:45:04 -05:00
Gregory Burd
ea99493ea3 Compenstate for LSM config API changes 2013-12-09 12:54:58 -05:00
Gregory Burd
08b2d18463 Fix checkpoint config. 2013-11-20 13:10:28 -05:00
Gregory Burd
3302ab26ed Use the develop branch for now. 2013-11-19 14:16:26 -05:00
Gregory Burd
db2daf99b2 Default logging off. 2013-11-19 14:16:01 -05:00
Gregory Burd
68d9ed942b Update to WiredTiger 1.6.6 2013-11-18 20:54:59 -05:00
Gregory Burd
36faa4e713 Forgot to remove second use of checkpoint setting. 2013-11-18 20:50:26 -05:00
Gregory Burd
e560185420 When logging enable checkpoints, even when using LSM. 2013-11-18 20:46:22 -05:00
Gregory Burd
448c0b555c Update config to match latest available options. 2013-10-30 15:00:41 -04:00
Gregory Burd
634bcd188a Integrate new configuration options available in WiredTiger. 2013-10-30 14:50:14 -04:00
Gregory Burd
1664fdcf8c API for handlers in WiredTiger changed to include session state, update our use of the API to match that change. 2013-10-30 13:11:14 -04:00
Gregory Burd
95515f111c Merge pull request #11 from basho-labs/gsb-2.0-fixes
Changes related to Riak 2.0 and an issue with how statistics were gathered from the backend
2013-10-30 08:53:49 -07:00
Gregory Burd
ac2c5caeff Change a few default configs and comment out the stats gathering for now. 2013-10-30 11:50:20 -04:00
Gregory Burd
75305dae94 Minor updates. 2013-10-12 21:48:05 -04:00
Gregory Burd
7d0ad2dce1 Update the version strings and a few config values which changed names. 2013-10-02 14:42:26 -04:00
Gregory Burd
84a85bbe38 Open a *statistics* cursor when gathering statistics. 2013-10-02 14:41:36 -04:00
Gregory Burd
9d2896016b A few build automation changes/fixes. 2013-10-02 14:41:06 -04:00
Gregory Burd
17585a99b1 priv now has the schema file in it, so be more specific with what we ignore in that dir 2013-10-02 14:38:41 -04:00
Gregory Burd
942e51b753 OS/X uses ".dylib" rather than ".so" for shared libraries (because it's
special) so I've worked around that.  Also tightened up some tests so that
we're not rebuilding the libraries when not necessary.
2013-09-06 09:54:55 -04:00
Gregory Burd
c60fa22422 Retry three times, then bail out and return not found. 2013-09-04 13:12:37 -04:00
Gregory Burd
48419ce4d0 Start the penalty after queues are 25% full because a) that makes sense, and b)
that avoids some odd badarith errors when PctBusy is very small.
2013-08-21 14:19:52 -04:00
Gregory Burd
2ddf0da53e Use malloc/free rather than enif_alloc/enif_free so as to avoid BEAM allocator
overhead (bytes and time).  Create static references to commonly used Erlang
atoms to avoid overhead re-creating them on each request cycle.
2013-08-21 12:20:19 -04:00
Gregory Burd
83c3faf74f Use malloc/free rather than enif_alloc/enif_free so as to avoid BEAM allocator
overhead (bytes and time).
2013-08-21 12:18:24 -04:00
Gregory Burd
2043e8ccc6 Because the build decends into the ext/compressors/snappy directory the
relative paths won't find system/include, so use the absolute paths instead.
2013-08-21 12:17:18 -04:00
Gregory Burd
33c8e53ccf Update to latest release of WiredTiger. Also, make sure Snappy builds before WiredTiger. 2013-08-21 12:16:24 -04:00
Gregory Burd
1bf66ae960 Every enqueued request now includes a hint as to how much work is pending in
the lower C-code.  We use that to scale the reduction count penalty so that we
can (hopefully) signal to the Erlang scheduler enough information for it to
properly throttle work.  'eagain' should only happen when queues are full, we
have no choice but to keep this calling proc busy in a recursive loop trying
the request over and over if we're going to preserve request ordering.
2013-08-21 12:15:34 -04:00
Gregory Burd
e67da86a9b Change backpressure method from EAGAIN to bump_reductions so as not to block Riak/KV vnode processes when queues backup. 2013-08-19 13:32:58 -04:00
Gregory Burd
2047104cda Remove the sleep from async_nif's EAGAIN path because it doesn't seem to have a positive effect. 2013-08-19 12:20:36 -04:00
Gregory Burd
96d43d5d17 Re-use the unchanging value of 'Args' rather than including it in every recursive call. 2013-08-02 14:22:30 -04:00
Gregory Burd
05c8c615ef I think the make_ref() needs to be within the fun()'s context to trigger selective receive optimization in the beam's runtime. 2013-08-01 10:02:21 -04:00
Gregory Burd
f153509409 With some input from Jon I've managed to reduce this back into a macro rather than a fun and a macro calling a fun. He also suggested that on eagain I sleep a small amount of time so as to allow other work to catch up a bit. 2013-07-31 15:39:55 -04:00
Gregory Burd
ee904b4769 Lower the queue size to shrink potential for latency in queue. Remove earlier idea that more queues would lead to more even worker progress, just have 1 queue per Erlang-scheduler thread (generally, 1 per CPU core available). Also change the way worker threads decide when to cond_wait or migrate to other queues looking for work. 2013-07-31 15:06:28 -04:00
Gregory Burd
c9a4ab8325 Revert changes to async_nif and re-enable stats. Fixed selective recv. 2013-07-31 09:41:36 -04:00
Gregory Burd
2393257bef Really disable stats. 2013-07-30 14:34:04 -04:00
Gregory Burd
211ffd884c Ignore requests for stats for right now. 2013-07-30 14:30:04 -04:00
Gregory Burd
4418a74183 Increase the number of queues for work to reside. Worker threads, once started, don't exit until shutdown. 2013-07-30 14:21:26 -04:00
Gregory Burd
1623d5293c Increase the max queue size. 2013-07-30 13:30:43 -04:00
Gregory Burd
56c2ac27c2 Revert to a macro-only, non-recursive on eagain method for managing requests. 2013-07-30 13:27:13 -04:00
Gregory Burd
27dba903ef The ref needs to be in-scope of the recieve for it to be optimized. 2013-07-30 13:20:49 -04:00
Gregory Burd
8f415df69c Merge pull request #10 from basho-labs/gsb-workers-migrate
Worker threads should check for work in other queues before exiting.
2013-07-26 17:12:15 -07:00
Gregory Burd
cce163db9f Fix potential to use uninitialized value when branching. 2013-07-26 20:08:49 -04:00
Gregory Burd
9a5defd8c9 Merge remote-tracking branch 'origin/master' into gsb-workers-migrate
Conflicts:
	c_src/async_nif.h
2013-07-26 10:31:23 -04:00
Gregory Burd
452d7694a6 Added some sanity checking of key/value sizes. Check for EAGAIN/INVAL/NOMEM when starting worker threads. Switch back to the 1.6.3 release branch of WT. 2013-07-26 10:27:21 -04:00
Gregory Burd
3627ff8690 Ensure that on EAGAIN we continue to try to spawn a worker. When workers finish with a queue have them migrate to the other queues looking for work. 2013-07-25 13:29:16 -04:00
Gregory Burd
122963133a Seems logging isn't a valid config value anymore, so remove it. 2013-07-18 13:26:53 -04:00
Gregory Burd
2a847b82d0 Forgot to remove this when I dumped the MRU. 2013-07-18 13:21:22 -04:00
Gregory Burd
2694cc1dba Remove the MRU, it wasn't really effective in most cases anyway and complicated logic (and was buggy). For now the cache mutex will be hot, but eventually I hope to move the cache to a lock-free dequeue. khash.h and cas.h aren't used anymore, so they have been removed. 2013-07-18 13:14:54 -04:00
Gregory Burd
bbadc81d53 Queue depth and num workers can race, so make sure that we start at least one worker when there are none active for that queue. 2013-07-15 16:51:08 -04:00
Gregory Burd
c3d3d39c36 Remove default setting from configuration. 2013-07-15 12:36:28 -04:00
Gregory Burd
bd0323af7a Update to WiredTiger 1.6.3. Fix a condition where a mutex was unlocked twice on eagain when queues were all full. 2013-07-15 12:21:10 -04:00
Gregory Burd
420b658e27 Really fix the lower bound for session handles (session_max) to 1024 (upper bound is 8192). 2013-07-08 19:59:00 -04:00
Gregory Burd
fea52c4ec3 Change lower bound for session handles (session_max) to 1024 (upper bound is 8192). 2013-07-08 19:45:45 -04:00
Gregory Burd
ac835f7617 Reduce precision of bloom filters to something more reasonable to avoid having very large bloom filters in cache. Reduce leaf pages to a more reasonable default. Given all data in Riak is <<Bucket, Key>> enable prefix_compression to hopefully reduce key overhead. 2013-07-08 13:49:44 -04:00
Gregory Burd
565f95f9b5 Cleanup makefile a bit 2013-07-08 11:12:50 -04:00
Gregory Burd
b6e8b175e3 Merge pull request #9 from basho-labs/gsb-ctx-cache
Cache session/[{cursor, config}] for reuse and spawn threads when needed.
2013-07-03 05:31:14 -07:00
Gregory Burd
bc0f5dbfc7 Evict older half of items in the cache by removing items from the end of the list, don't waste cycles computing timestamps. 2013-07-02 22:23:32 -04:00
Gregory Burd
b727538162 Fix shutdown 2013-07-02 22:07:34 -04:00
Gregory Burd
2672bab3ea Stats overhead due to hitting the clock and pulling a mutex caused a massive slowdown so now work is assigned to a new queue only when the candidate queue is deeper than the average of the other queues and threads are created only when the depth of the queue is larger than the number of threads working on that queue. 2013-07-02 19:58:00 -04:00
Gregory Burd
00e5889ac9 Changed conditions for worker thread creation. 2013-07-02 16:46:04 -04:00
Gregory Burd
4300b3036f Working on triggers that start/stop worker threads. 2013-07-01 21:09:21 -04:00
Gregory Burd
c7b45a7c2b Still ironing out stats. 2013-06-27 10:57:41 -04:00
Gregory Burd
c41e411a92 Worker threads come and go as needed with a lower bound of 2 and an
upper bound of ASYNC_NIF_MAX_WORKERS.  Stats were improved to use
thread local storage for measures.  With stats working again wterl
uses them to determine who to evict.  Wterl's signature calculation
for an operation wasn't correct and so the cache wasn't efficient at
all, this has been fixed.
2013-06-25 13:31:43 -04:00
Gregory Burd
a3c54b1610 Cleanup a bit. 2013-06-19 14:54:27 -04:00
Gregory Burd
060abffcff Minor oversight, fixed. 2013-06-19 14:42:56 -04:00
Gregory Burd
0f180a6531 Fixed a few mistakes. 2013-06-19 14:37:30 -04:00
Gregory Burd
450299dc2d Comment out a test that's not yet working. 2013-06-18 16:40:43 -04:00
Gregory Burd
4ae8ffb4cd Update debugging messages a bit. Fix a bug in the signature function. 2013-06-18 13:49:25 -04:00
Gregory Burd
34e88c9234 Add some debugging output. 2013-06-18 13:12:10 -04:00
Gregory Burd
50e24d0f48 Add a longer, multi-table test. Use a release version of WiredTiger from now on. 2013-06-18 09:21:58 -04:00
Gregory Burd
53307e8c01 A great deal of cleanup. EUnit and EQC tests pass. 2013-06-14 16:57:53 -04:00
Gregory Burd
ff7d1d6e20 WIP: further simplifying context cache 2013-06-14 10:52:45 -04:00
Gregory Burd
7952358781 WIP: cache wasn't returning items found 2013-06-12 09:08:09 -04:00
Gregory Burd
4460434db1 WIP: remove potential for infinite loops with CAS and fix a few issues in async 2013-06-12 08:09:51 -04:00
Gregory Burd
110b482962 Some paranoia and a few fixes 2013-06-11 12:13:06 -04:00
Gregory Burd
8ea866bf20 Logic inversion on CAS() operation. 2013-06-10 16:18:55 -04:00
Gregory Burd
2a4b8ee7d2 WIP: simplify the cache from hash-of-lists to list; use a CAS() operation to protect the most-recently-used (mru) list. 2013-06-10 14:31:59 -04:00
b2c0b65114 Fixes for OS/X and whitespace cleanup. 2013-06-06 15:16:50 -04:00
Gregory Burd
0fef28de92 WIP: basho_bench tests are running fine now, need more work to ensure cache is functioning properly. 2013-06-05 11:41:41 -04:00
Gregory Burd
778ba20352 WIP: a bit of cleanup fixes a few mistakes 2013-06-04 17:21:50 -04:00
Gregory Burd
f1b7d8322d WIP: replcae the kbtree with khash, we don't need the tree features (yet, if ever) and hash is faster; add a most-recently-used stash for contexts as it's highly likely that worker threads will do many operations with the same shape/signature of session/cursors/tables/config and that path can be lock-free as well making it much faster (one would hope); somewhere something is stepping on read-only ErlNifBinary data and so a crc check is failing and causing the runtime to abort, that's the latest item to find/fix. 2013-06-04 14:45:23 -04:00
Gregory Burd
b002294c4e WIP: all tests (but drop) passing again, need to fix that and valgrind next. 2013-05-31 20:32:02 -04:00
Gregory Burd
9468870e1f WIP: use a log2 histogram to track how long items live in the cache, evict items which have been in cache greater than the mean time. 2013-05-30 17:10:51 -04:00
Gregory Burd
15fbc71ea7 WIP: pieces in place, need to work out the kinks now. 2013-05-30 14:21:34 -04:00
Gregory Burd
f0d5baeb0e WIP: more caching work, still not done. 2013-05-29 14:57:35 -04:00
Gregory Burd
a2cd1d562c WIP: devising a better way to cache/reuse session/cursor pairs. 2013-05-28 16:14:19 -04:00
Gregory Burd
013251e6d9 Update WiredTiger statistics less frequently and fix a build script
mistake preventing checking out by branch name.
2013-05-14 13:51:44 -04:00
Gregory Burd
4745950ad5 Another TODO item. 2013-05-02 11:29:37 -04:00
Gregory Burd
98fffd199f Updated todo list. 2013-05-02 08:41:44 -04:00
Gregory Burd
786142ce73 Add a bit of statistics tracking for two reasons, a) to help inform
where a request should be enqueded and b) to track request latency.
2013-05-01 22:02:37 -04:00
Gregory Burd
ae64a5e26f Move async nif struct definition back to where it belongs. 2013-05-01 22:02:21 -04:00
Gregory Burd
d1d648af1e Now matches against proper library name and prevents rebuilding when unnecessary. 2013-05-01 22:00:43 -04:00
Gregory Burd
7c9c3b78db Remove this vestige of bzip2 support. 2013-05-01 21:59:54 -04:00
Gregory Burd
12fd9134fa Add a few placeholders for missing functions. 2013-05-01 21:59:12 -04:00
Gregory Burd
0bde789af5 Tune the LSM a bit based on testing, increase the chunk size and add more merge
worker threads into the mix.
2013-05-01 07:06:13 -04:00
Gregory Burd
d93f0bc162 A more friendly approach to make clean in c_src. 2013-04-26 18:57:17 -04:00
Gregory Burd
bfc180eeb3 Merge branch 'master' of github.com:basho-labs/wterl
Conflicts:
	c_src/wiredtiger-build.patch
2013-04-26 10:37:06 -04:00
Gregory Burd
f1cb6f8c61 Remove support for bzip2. Change build_deps.sh to only remove the
build files from wiredtiger when cleaning rather than all the files.
2013-04-26 10:34:57 -04:00
Gregory Burd
eafee02865 Only start 2 * num_queues worker threads initially. num_queues is generally
equal to the number of cores reported by Erlang (info.scheduler_threads) which
is either determined automatically by the Erlang BEAM runtime or via the +S
flag.  The minimum num_queues is 2, so the minimum number of workers is 4.  The
maximum number of workers is ASYNC_NIF_MAX_WORKER_QUEUE_SIZE (currently set to
128), but that would only happen if there were 64 cores (or you set +S 64:64 at
startup).
2013-04-26 10:15:15 -04:00
Gregory Burd
c410312c40 Switch to the develop branch. 2013-04-26 10:07:10 -04:00
Gregory Burd
3a51afcc20 Copy paste error in patch file prevented proper build arguments for bzip2. 2013-04-25 16:36:17 -04:00
Gregory Burd
422dcfda89 Return 'eagain' when request queue is full and then try the request again.
In the worst case is the request queue remains full and we loop between
the NIF and Erlang forever trying over and over to enqueue the request. If
that happens we shouldn't take schedulers offline as the NIF calls are fast
and we shouldn't run out of memory as that is bounded.  CPU will show a lot
of activity, but progress will continue in Erlang.
2013-04-25 15:18:23 -04:00
Gregory Burd
f043a99ccb Merge pull request #8 from basho-labs/gsb-recycle-reqs
Reuse req and ErlNifEnv rather than re-alloc/use/free them for every request.
2013-04-25 10:26:16 -07:00
Gregory Burd
6b393ac47c Keep allocated req and ErlNifEnv around for reuse rather than re-alloc'ing them on each request should save us some overhead on the hot path. 2013-04-25 11:30:11 -04:00
Gregory Burd
35e36d1cae Whitespace 2013-04-24 14:28:24 -04:00
Gregory Burd
4076a1923c Driver and config for basho_bench testing. 2013-04-24 14:27:56 -04:00
Gregory Burd
54c5158490 Merge pull request #7 from basho-labs/gsb-event-handlers
Deliver WiredTiger event notifications (messages, errors, progress) into Erlang for logging
2013-04-23 12:02:22 -07:00
Gregory Burd
46243ff54a Remove debug compiler flags. 2013-04-23 15:00:44 -04:00
Gregory Burd
35de23c0f3 Include the gen_server module that will log events using lager. 2013-04-23 08:22:48 -04:00
Gregory Burd
6f58faaba2 Turn on some of the verbose options, but leave most off. 2013-04-22 20:55:45 -04:00
Gregory Burd
88416d1991 Merge pull request #6 from basho-labs/gsb-async-nifs3
Execute NIF calls on non-scheduler threads asynchronously
2013-04-22 17:54:56 -07:00
Gregory Burd
fc59ae3f90 WIP: Finished transitioning to a gen_server and a few other tasks. This works
except that when trying to enif_send a bunch of messages eventually one causes
a segv just after reporting: size_object: bad tag for 0x80
2013-04-22 17:45:48 -04:00
Gregory Burd
652771003e WIP: a good start, I need to switch over wterl_event_handler to be a
gen_server and I need to add a way to set the pid of the message handler
process to the NIF API.
2013-04-22 09:52:21 -04:00
Gregory Burd
3398540831 Have Erlang and C compilers be a bit more picky. 2013-04-21 11:16:04 -04:00
Gregory Burd
ac59cd69dd Check return from kn_init. WiredTiger truncate doesn't require you to position cursors for the range to be truncated. 2013-04-21 11:14:48 -04:00
Gregory Burd
fae6831580 Ensure that the env is clear when signaling shutdown. 2013-04-21 11:11:17 -04:00
Gregory Burd
a282ad2f1d Improve, extend some targets. 2013-04-21 11:10:06 -04:00
Gregory Burd
6c74c825e4 Avoid calling a process that may have already exited. 2013-04-20 13:30:09 -04:00
Gregory Burd
48141346f0 Remove redundant wt from filenames. 2013-04-20 12:49:14 -04:00
Gregory Burd
a1459ce607 It's statistics:<your table name here> not table:statistics (oops). 2013-04-20 12:33:36 -04:00
Gregory Burd
b69f35364b Use the less onerous version of statistics. 2013-04-20 12:27:42 -04:00
Gregory Burd
7380bd3bd7 Open the statistics table, not the actual table. 2013-04-20 12:08:55 -04:00
Gregory Burd
bfe56136d8 * Be sure to release the reqs mutext on shutdown. 2013-04-20 08:28:38 -04:00
Gregory Burd
554aba4331 Truncate table isn't used by the WiredTiger Riak/KV backend API and it is
currently failing.  Comment out of the tests, for now.
2013-04-20 08:20:45 -04:00
Gregory Burd
3310129918 * Check to make sure that all ASYNC_NIF_REPLY() calls are at the end of their
blocks, just before return and after releasing any local resources or locks.

* Check the return value of when setting up cursor caches so as not to miss an
ENOMEM or other error.

*Cleanup and free resources when closing a connection handle.

* Add a few missing mutex unlock calls on error paths.

* Ensure all resources are free'd/released/closed during truncate error paths.

* Free up alloc'ed copies of keys, cursor handles and sessions on unload.
2013-04-20 07:38:11 -04:00
Gregory Burd
01a79a08c0 Add a comment and remove an mutext unlock on error because this function is not
where the lock is acquired.
2013-04-20 06:13:41 -04:00
Gregory Burd
be95229af9 On error when fetching stats, ignore and return whatever you have so far. 2013-04-19 16:44:51 -04:00
Gregory Burd
17cfdcd04f Fix default tuple. 2013-04-19 16:07:18 -04:00
Gregory Burd
e9145c344c Ignore eperm on status cursors and empty cursors (for now). 2013-04-19 15:46:58 -04:00
Gregory Burd
5d3cb9997a For now we ignore eperm and ebusy error returns on fold and drop. 2013-04-19 15:09:29 -04:00
Gregory Burd
b936b99531 Minor fix. 2013-04-19 15:00:57 -04:00
Gregory Burd
d505f7f9c8 Whitespace. Call wterl:drop not truncate for drop calls and set force
to true when calling so that we ignore ENOENT.  Change the session estimate
down a bit.
2013-04-19 14:55:32 -04:00
Gregory Burd
40bdda15bb Create links for bzip2 shared libraries. 2013-04-19 12:33:20 -04:00
Gregory Burd
7b1dcb507d Need to copy shared library links for bzip2 as well (duh). 2013-04-19 11:14:03 -04:00
Gregory Burd
203fa1a54d Merge branch 'gsb-async-nifs3' of github.com:basho-labs/wterl into gsb-async-nifs3 2013-04-19 11:06:25 -04:00
Gregory Burd
01a8926160 Shared libs and files linking to them must be copied into our priv
dir so that at runtime we can find the correct files.
2013-04-19 11:03:40 -04:00
Gregory Burd
8d8ceecc8b enif_get_string can return < 1 when it copies less than the buffer size
you pass into it, that'd result in a non-zero (aka true) test when in fact
it's a problem if the argument isn't passed completely (however unlikely
that is).

enif_alloc_env() requires that later you enif_free_env() which I wasn't doing,
this seems to keep memory steady in test runs.
2013-04-19 09:11:41 -04:00
Gregory Burd
7a1e683a47 Working toward a more automated valgrind. 2013-04-18 17:07:27 -04:00
Gregory Burd
95d8a28453 Remove unused num_contexts count. Cleanup the session/cursor cache
init process.
2013-04-18 13:36:24 -04:00
Gregory Burd
728d2281e0 Ensure that we init the shared cache when asking for a cursor before
asking for a session.
2013-04-18 12:24:49 -04:00
Gregory Burd
846f7f72ba Must compile before commit, must compile before commit... 2013-04-18 12:01:34 -04:00
Gregory Burd
7a148b9f19 get, put, delete don't use sessions, only cursors so don't fetch the
shared session.
2013-04-18 11:59:10 -04:00
Gregory Burd
83bca853f3 Store copies of Uri as keys in the hash table. 2013-04-18 11:47:12 -04:00
Gregory Burd
60dd048b7e Move the FIFO Queue implementation into its own file (fifo_q.h). Work
on the nif_unload path.  Free up resources owned by wterl.c when
unloading.  Continue to evolve the build script.  Add to khash the ability
to create a hash that maps from a pointer to a value. There is still a segv
due to a race wterl.c:do_unload() which needs to be addressed.
2013-04-18 10:37:36 -04:00
Gregory Burd
db953f5b39 Moved num_queue estimate earlier so as to ensure that the amount of
memory allocated, zero'ed and free'd was consistent.  Skip free'ing
async environment as Erlang will free that for us when we no longer
reference it.  Fix the memory leak (or at least one of them) by no
longer copying the Uri into the hash table.
2013-04-17 18:26:59 -04:00
Gregory Burd
1ae8e5698f Ensure that the ratio of workers to queues is 2:1 and that there are at
least 2 queues regardless.  Fix a few race conditions (h/t Sue from
WiredTiger for some nice work) and cherry pick (for now) a commit that
fixes a bug I triggered and Keith fixed (in < 10min from report) related
to WiredTiger stats.  Ensure that my guesstimate for session_max is no
larger than WiredTiger can manage.  Continue to fiddle with the build
script.
2013-04-17 16:48:23 -04:00
Gregory Burd
f664c303d8 Disable prefix compression (for now). 2013-04-17 14:23:56 -04:00
Gregory Burd
123dfa600e Simplified the worker look function. Added ability to pick block
compressor in config, default is snappy, off is {block_compressor, none}.
2013-04-17 13:19:06 -04:00
Gregory Burd
87f70d75a1 Inline the fifo_q functions to speed them up and silence compiler warnings
for unused API calls.  Add a fifo_q_full call to hide the details of that.
Alloc work queues along with the async_nif at the end of that memory block.
Fix a few places where things should be free'd and were not.  Change enqueue
to return 0 when shutting down.  Fix a race related to shutdown.  When I use
gdb eunit calls ?cmd() seem to fail, so I've created rmdir:path() to replace
?cmd("rm -rf path") calls.
2013-04-17 11:17:13 -04:00
Gregory Burd
1913e7fdf5 Continue to iterate on the build system to accomodate shared libs. 2013-04-16 21:46:53 -04:00
Gregory Burd
5e80477d4a Merge remote-tracking branch 'origin/gsb-async-nifs3' into gsb-async-nifs3 2013-04-16 19:34:11 -04:00
Gregory Burd
13a9036764 So, WiredTiger prefers that extensions be shared libraries however those
extensions reference other shared libraries.  Previously I tried to turn
all those shared libs into static libs, however that didn't pan out so well.
So now the libwiredtiger.so library is loaded by wterl.so also, later on
during use libwiredtiger.so will load libwiredtiger_snappy.so and
libwiredtiger_bzip2.so which now have libsnappy.a and libbz2.a respectively
linked statically so as to avoid confusion.
2013-04-16 19:31:10 -04:00
Gregory Burd
76d7a7d0a3 Revert a small change 2013-04-16 17:20:57 -04:00
Gregory Burd
2d077a0baf Explicitly close cursors we open despite knowing they'll be closed with the session later. 2013-04-16 17:15:23 -04:00
Gregory Burd
3dab6a2dc5 Clean up 2013-04-16 17:09:34 -04:00
Gregory Burd
36c1d3f829 Move things around in the build script to get a bit of reuse. 2013-04-16 16:24:04 -04:00
Gregory Burd
bd4d852a19 Remove wildcards from the patch file names and fix checkout of wiredtiger repo to pull and track remote branch. 2013-04-16 17:41:00 +00:00
Gregory Burd
249c600554 Switching back to static linking to avoid platform issues and have
confidence when in deployment of what code is being used.
2013-04-16 11:47:04 -04:00
Gregory Burd
941bb0a929 Whitespace 2013-04-16 11:46:51 -04:00
Gregory Burd
ba41dd7fb6 Use the table name in get/put/delete calls to form an "affinity" with a
worker queue so that we spread work around and make it more likely that
work for a given table goes first to a given set of worker threads.
2013-04-15 18:46:06 -04:00
Gregory Burd
371779d14e Return to alloc'ed requests because there may be many more in flight
than those in the various queues.  Reenable the (still failing)
truncate tests (because they don't SEGV anymore).  Still might be
a memory leak, next up is valgrind.
2013-04-15 17:37:14 -04:00
Gregory Burd
668109de25 Added match/gt/lt atom return for cursor_search_near() call. Changed
the request queue over to a simple fifo queue which could (if needed)
be made lock-free.  Cursor searches can optionally now specifiy that
they are mid-scan so as not to have their cursor handles reset every
call.
2013-04-15 15:22:12 -04:00
Gregory Burd
e37c5b68d9 Type-o fixed. 2013-04-15 00:25:00 -04:00
Gregory Burd
cc73310cb3 Missed a change. 2013-04-15 00:11:48 -04:00
Gregory Burd
5ba491adfa We don't use the scheduler id from Erlang anymore in async_nif so
stop sending it over.  Allow the user to set a "type" of storage
in their config to either 'table' for btree or 'lsm' for a log
structured merge tree.  Various other cleanup.
2013-04-15 00:08:01 -04:00
Gregory Burd
9ed2137730 A bit of cleanup 2013-04-14 19:33:14 -04:00
Super-User
15a9a70c41 bzero isn't portable, use memset instead. Include bzip2 and snappy source for use by WT block compression
e
2013-04-14 21:23:57 +00:00
Gregory Burd
22643fc312 Change from waking up all threads to waking up one thread. 2013-04-14 13:54:45 -04:00
Gregory Burd
8fe9dc9bad This is a different approach from async2, instead of a single queue
and many workers this has a queue per scheduler and a few workers per
queue.
2013-04-14 08:44:54 -04:00
Gregory Burd
456129e7f3 Reusing closed cursors is bad for your health (SEGV), don't do that.
Also a bit of paranoia, bzero after all enif_alloc calls.
2013-04-12 16:59:10 -04:00
Gregory Burd
1962640382 Move the cleanup code into conn_close and remove the need for a dtor
function for resource GC/cleanup.  It's easier.
2013-04-12 16:09:00 -04:00
Gregory Burd
b0ca1e4098 WIP -- 41 tests pass, 3 fail = progress! Right now I have the connection
dtor commented out, otherwise it would SEGV on GC.  Some of the truncate
tests fail (race?) but don't SEGV, so that's not so bad.  Fixed numerous
issues and also removed a mutex and queue of idle worker threads because
it isn't used so why bother with it?
2013-04-12 15:25:56 -04:00
Gregory Burd
5a73264307 WIP -- Most tests passing, truncate and drop failing due to EBUSY,
also not sure why there is a segv when exiting but not in gdb.
Unlock mutex before returning error codes.  Alloc/free copies of some
strings allocated on the stack.
2013-04-11 11:57:41 -04:00
Gregory Burd
5c0295624d WIP -- most tests running, a few commented out, sometimes a segv on exit 2013-04-08 17:21:48 -04:00
Gregory Burd
07061ed6e8 WIP -- more tests passing, plenty left to fix 2013-04-07 22:16:44 -04:00
Gregory Burd
3b41805a71 WIP -- continue to fix tests in wterl 2013-04-07 10:07:19 -04:00
Gregory Burd
5ac006630e WIP -- tests failing -- WIP
Fixed numerous bugs.  Pushed config_to_bin down to just before
calling the _nif() function, everywhere else config is a proplist.
2013-04-07 09:21:47 -04:00
Gregory Burd
2675aa501a Opening a connection now involves passing in both a configuration
string for the connection and for any sessions.  Don't convert
configuration proplists into strings (Erlang binaries) until calling
into the NIF layer.
2013-04-06 17:19:59 -04:00
Gregory Burd
07592f20fb Use non-cursor ops for get/put/delete because under the covers they
are reusing sessions and cursors in the NIF layer.  Keep two cursors
open for just the house keeping tasks, is_empty and stats.
2013-04-06 17:17:50 -04:00
Gregory Burd
f7219dcaec WIP: Fix minor C89 issue 2013-04-06 11:14:23 -04:00
Gregory Burd
b4f82a388d WIP-- Compiling, not yet tested/functional -- WIP
Changes required to iron out compiler errors, warnings, etc.  Code now
compiles with clang or gcc.
2013-04-06 11:05:41 -04:00
Gregory Burd
19268b7c77 WIP-- Not Yet Compiling or Functional -- WIP
* No longer expose WT_SESSION into Erlang at all as WT's model is to
maintain one WT_SESSION per-thread and we don't know anything about
threads in Erlang.

* async_nif worker threads don't pull both mutexes on every loop
when processing requests, only one

* async_nif provides a worker_id (int, 0 - MAX_WORKERS) within the
work block scope which we use to find our per-worker WT_SESSIONs

* async_nif maintained a number of globals which I'm moving into
the NIF's priv_data so that on upgrade/reload we have a fighting
chance to "Do the Right Thing(TM)".

* NIF Upgrades/Reloads started to plumb this in.

* Use a khash to manage the cache of URI->WT_CURSORs per WT_SESSION.

* Added start/stop positions into truncate call to allow for truncating
sub-ranges data.

* Lots of other details I'm sure I've forgotten and more left undone.
Search for "TODO:" or try to compile to see what's left, and then
there is a need for a lot more tests given all this new complexity.
2013-04-05 18:09:54 -04:00
Gregory Burd
163a5073cb An updated Emacs cmode leads to all sorts of whitespace fixes intermixed with
some merge changes.
2013-04-02 09:42:07 -04:00
Gregory Burd
9c379a21dd Ignore a bit more. 2013-04-02 09:39:05 -04:00
Gregory Burd
254891e776 Bumping reductions manually will not be supported in future releases of the
BEAM and has dubious value anyway.  Remove that and ignore queue depth for
now.
2013-04-02 09:37:49 -04:00
Gregory Burd
609676917d Rename function to match new API. 2013-04-02 09:36:34 -04:00
Gregory Burd
6aa51437cc Merge remote-tracking branch 'origin/master' into gsb-async-nifs2
Conflicts:
	c_src/wterl.c
	src/riak_kv_wterl_backend.erl
	src/wterl.erl
2013-04-02 09:33:41 -04:00
Gregory Burd
a75d60bf27 Add an RPATH to the extionsion shared libs so they can find objects in the priv/ directory at runtime 2013-04-01 16:17:18 -04:00
Gregory Burd
95a6f52a1d Whitespace 2013-04-01 14:00:06 -04:00
Gregory Burd
e3219ff2c5 Add linker flags so that shared libraries are found relative to either the wterl layout or the Riak layout. 2013-03-28 22:21:36 -04:00
Gregory Burd
1595ce3096 Automatically load all WiredTiger extension shared libraries found in the priv dir at load time. 2013-03-28 10:45:24 -04:00
Gregory Burd
3ef48e0ac0 Change from static linking to dynamic. Enable snappy and bzip2 compression. 2013-03-28 10:24:26 -04:00
Gregory Burd
46214daa06 Remove check for other module. 2013-03-28 10:23:56 -04:00
Gregory Burd
77e0dacdb5 Use a set of {Session, Cursor} pairs ("passes") for calls into WiredTiger
to avoid a) opening a large number of Sessions or Cursors and b) avoid
reusing a Session/Cursor pair in more than one thread.  Each operation,
except folds, use the Session/Cursor pair corresponding to the scheduler
id (roughly "core") they are running on.  Each vnode instance will have
(count(schedulers) + count(active folds)) Session/Cursor pairs in use so
be sure to set session_max large enough when opening the connection to
WiredTiger.  This method will cause problems with scheduler threads going
to "sleep" so be sure to run schedmon to wake them up.
2013-03-26 14:17:21 -04:00
Gregory Burd
78e667bc09 Depend on async_nif.h 2013-03-26 13:02:45 -04:00
Gregory Burd
9f4e08ca6e Queue and execute work from scheduler threads on other threads to prevent schedulers from sleeping. 2013-03-24 21:00:48 -04:00
Gregory Burd
85b84a5343 Folds need a separate cursor, but not a separate session, so use
the shared connection and session.
2013-03-21 21:39:47 -04:00
Gregory Burd
9834f54991 Cover a few more corner cases when encoding cache size and other
values to config strings.
2013-03-21 21:05:01 -04:00
Gregory Burd
9302def7cc Lower cache size guess from 1/3 to 1/4 of available memory. Finally correct lsm_bloom_new/oldest config. 2013-03-21 18:56:01 -04:00
Gregory Burd
0203b06a61 Cursor delete operation only requires cursor handle and key (no need for a value) 2013-03-21 18:44:13 -04:00
Gregory Burd
a844dc5846 Whitespace 2013-03-21 14:37:37 -04:00
Gregory Burd
e02eb8ef9f Don't log the cache size guess. 2013-03-21 14:37:24 -04:00
Gregory Burd
50d44ecedf Move LSM config to the table open options where they belong. 2013-03-21 14:35:53 -04:00
Gregory Burd
6225f9fde1 Share cursors managed stored in the module's ets table. 2013-03-21 14:34:54 -04:00
Gregory Burd
9ab363adfe Indention 2013-03-21 14:12:03 -04:00
Gregory Burd
b905cce833 Add lsm_bloom_{newest,oldest}=true to set of defaults. 2013-03-19 15:29:13 -04:00
Gregory Burd
f9fcf305ae Revert list change. 2013-03-19 14:51:22 -04:00
Gregory Burd
8ed1c4000e Additional LSM Tree config parameters. 2013-03-19 14:34:01 -04:00
Gregory Burd
33e66ec4e5 Minor change to array construction. 2013-03-19 14:33:38 -04:00
Gregory Burd
39fb4ff710 Pass config into connection:open() rather than ignoring it and do a
better job of integrating config options from the app environment as
well as config file while we're at it.
2013-03-18 15:32:03 -04:00
Gregory Burd
24527cf453 Break out connection and session startup into functions. 2013-03-17 14:04:31 -04:00
Gregory Burd
ce79d22659 Revert shared cursor changes for now. 2013-03-17 08:15:33 -04:00
Gregory Burd
1129241fe0 Only open a session/cursor when first needed. There is still a race
between vnodes during startup, ideally wterl_conn would only be opened
once.
2013-03-16 23:28:13 -04:00
Gregory Burd
22d42f5076 Fixed issue reusing cursors. 2013-03-16 16:42:43 -04:00
Gregory Burd
ab45690a4b Pull via http to avoid certificate errors with https on certain platforms
(Illumos, SmartOS, Solaris... I'm looking at you.) Speed up builds by
running make in parallel (use all the cores!).
2013-03-16 16:41:26 -04:00
Gregory Burd
fab5877ff5 Mistakenly allowed a file rename from another branch here. Use "wterl"
not just "wt" for the shared library name.
2013-03-14 17:36:45 -04:00
Gregory Burd
3543b42df8 Whitespace fixes (s/tab/space/g) and added two new configuration settings
from the 1.5.0 release.  Checkpoint every second, dump stats every 30 sec.
2013-03-14 15:26:28 -04:00
Steve Vinoski
85fadeec70 Merge pull request #5 from basho-labs/gsb-cache-size-est
Estimate a reasonable cache size if one isn't provided in app/env config for wterl.
2013-03-14 11:05:38 -07:00
Gregory Burd
ef3e161f12 Embelish rebar's config a bit. 2013-03-14 13:33:34 -04:00
Gregory Burd
1d6dfee192 Cache sizing is critical for performance. This change guesses at a
reasonable setting for the WiredTiger cache size at runtime.  This cache
is shared across all vnodes regarless of how many are active at any
given time.  The algorithm is: max(1GB, 1/3 (RAM - Beam RSS size)). We don't
enable direct_io on purpose and data will be double buffered in WiredTiger's
cache and the filesystem buffer cache.  This turns out to be faster than
direct I/O despite wasting a bit of RAM.
2013-03-14 13:27:59 -04:00
Steve Vinoski
efdeb70764 Merge pull request #4 from basho-labs/gsb-shared-cursors
Share per-table cursors for get/put/delete operations.
2013-03-13 15:15:35 -07:00
Gregory Burd
e4b3acbd11 Share per-table cursors for get/put/delete operations. 2013-03-13 17:27:00 -04:00
Steve Vinoski
f94336f3eb base WiredTiger max session config setting on ring size 2013-03-12 18:24:06 -04:00
Gregory Burd
021dcb10f5 Build from source pulled from a specific branch on GitHub for "basho"
related work and integration of WiredTiger rather than the latest
.tar.bz2 file they distribute.
2013-03-12 17:16:15 -04:00
Gregory Burd
789c44b133 Change the config to use a lsm tree rather than btree for tables.
Stop using direct_io as it forces operations to sync more often, slowing
things down, at the expense of double-buffering (this will use more RAM).
2013-03-12 17:16:15 -04:00
Gregory Burd
c6eac27ea7 Start with a more rational default configuration. (Before you ask... the
answer is 'no').  So far there has been no effort to validate that these
settings are in fact the best for Riak/KV or CS data access patterns.  These
particular settings are, at best, an educated guess based on past experience,
the docs and reading about the benchmark the WiredTiger team published here:

https://github.com/wiredtiger/wiredtiger/wiki/YCSB-Mapkeeper-benchmark
2013-03-12 17:16:15 -04:00
Gregory Burd
9a02718a2e Pass configuration along when opening sessions. 2013-03-12 17:16:15 -04:00
Gregory Burd
b063d866cc Merge pull request #2 from basho-labs/gsb-makefile
Add some more depth to the targets.
2013-03-07 17:24:29 -08:00
28 changed files with 5387 additions and 1120 deletions

4
.gdbinit Normal file
View file

@ -0,0 +1,4 @@
handle SIGPIPE nostop noprint pass
#b erl_nif.c:1203
#b sys/unix/erl_unix_sys_ddll.c:234

10
.gitignore vendored
View file

@ -1,8 +1,14 @@
*.beam
.eunit
ebin
priv/*.so
c_src/system
c_src/wiredtiger-*/
c_src/wiredtiger*/
c_src/*.o
c_src/bzip2-1.0.6
c_src/snappy-1.0.4
deps/
priv/wt
priv/*.so*
priv/*.dylib*
log/
*~

2
AUTHORS Normal file
View file

@ -0,0 +1,2 @@
-author('Steve Vinoski <steve@basho.com>').
-author('Gregory Burd <steve@basho.com>'). % greg@burd.me @gregburd

143
Makefile
View file

@ -1,24 +1,72 @@
TARGET= wterl
# Copyright 2012 Erlware, LLC. All Rights Reserved.
#
# This file is provided to you under the Apache License,
# Version 2.0 (the "License"); you may not use this file
# except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
REBAR= ./rebar
#REBAR= /usr/bin/env rebar
ERL= /usr/bin/env erl
DIALYZER= /usr/bin/env dialyzer
# Source: https://gist.github.com/ericbmerritt/5706091
.PHONY: plt analyze all deps compile get-deps clean
ERLFLAGS= -pa $(CURDIR)/.eunit -pa $(CURDIR)/ebin -pa $(CURDIR)/deps/*/ebin
all: compile
DEPS_PLT=$(CURDIR)/.deps_plt
DEPS=erts kernel stdlib
deps: get-deps
# =============================================================================
# Verify that the programs we need to run are installed on this system
# =============================================================================
ERL = $(shell which erl)
get-deps:
@$(REBAR) get-deps
ifeq ($(ERL),)
$(error "Erlang not available on this system")
endif
REBAR=$(shell which rebar)
ifeq ($(REBAR),)
$(error "Rebar not available on this system")
endif
DIALYZER=$(shell which dialyzer)
ifeq ($(DIALYZER),)
$(error "Dialyzer not available on this system")
endif
TYPER=$(shell which typer)
ifeq ($(TYPER),)
$(error "Typer not available on this system")
endif
.PHONY: all compile doc clean test dialyzer typer shell distclean pdf \
update-deps clean-common-test-data rebuild
all: deps compile
# =============================================================================
# Rules to build the system
# =============================================================================
deps:
$(REBAR) get-deps
$(REBAR) compile
update-deps:
@$(REBAR) update-deps
$(REBAR) update-deps
$(REBAR) compile
c_src/wterl.o:
c_src/wterl.o: c_src/async_nif.h
touch c_src/wterl.c
ebin/app_helper.beam:
@ -27,33 +75,52 @@ ebin/app_helper.beam:
@/bin/false
compile: c_src/wterl.o ebin/app_helper.beam
@$(REBAR) compile
$(REBAR) skip_deps=true compile
doc:
$(REBAR) skip_deps=true doc
eunit: compile clean-common-test-data
$(REBAR) skip_deps=true eunit
test: compile eunit
$(DEPS_PLT):
@echo Building local plt at $(DEPS_PLT)
@echo
dialyzer --output_plt $(DEPS_PLT) --build_plt \
--apps $(DEPS) -r deps
dialyzer: $(DEPS_PLT)
$(DIALYZER) --fullpath --plt $(DEPS_PLT) -Wrace_conditions -r ./ebin
typer:
$(TYPER) --plt $(DEPS_PLT) -r ./src
xref:
$(REBAR) xref skip_deps=true
# You often want *rebuilt* rebar tests to be available to the shell you have to
# call eunit (to get the tests rebuilt). However, eunit runs the tests, which
# probably fails (thats probably why You want them in the shell). This
# (prefixing the command with "-") runs eunit but tells make to ignore the
# result.
shell: deps compile
- @$(REBAR) skip_deps=true eunit
@$(ERL) $(ERLFLAGS)
pdf:
pandoc README.md -o README.pdf
clean:
@$(REBAR) clean
- c_src/build_deps.sh clean
- rm -rf $(CURDIR)/test/*.beam
- rm -rf $(CURDIR)/logs
- rm -rf $(CURDIR)/ebin
$(REBAR) skip_deps=true clean
test: eunit
distclean: clean
- rm -rf $(DEPS_PLT)
- rm -rvf $(CURDIR)/deps
eunit: compile
@$(REBAR) eunit skip_deps=true
eunit_console:
@$(ERL) -pa .eunit deps/*/ebin
plt: compile
@$(DIALYZER) --build_plt --output_plt .$(TARGET).plt -pa deps/*/ebin --apps kernel stdlib
analyze: compile
$(DIALYZER) --plt .$(TARGET).plt -pa deps/*/ebin ebin
repl:
$(ERL) -pz deps/*/ebin -pa ebin
gdb-repl:
USE_GDB=1 $(ERL) -pz deps/*/ebin -pa ebin
eunit-repl:
$(ERL) -pa .eunit -pz deps/*/ebin -pz ebin -exec 'cd(".eunit").'
gdb-eunit-repl:
USE_GDB=1 $(ERL) -pa .eunit -pz deps/*/ebin -pz ebin -exec 'cd(".eunit").'
rebuild: distclean deps compile escript dialyzer test

View file

@ -1,26 +1,54 @@
`wterl` is an Erlang interface to the WiredTiger database, and is written
to support a Riak storage backend that uses WiredTiger.
This backend currently supports only key-value storage and retrieval.
`wterl` is an Erlang interface to the WiredTiger database, and is written to
support a Riak storage backend that uses WiredTiger.
Remaining work includes:
* The `wterl:session_create` function currently returns an error under
certain circumstances, so we currently ignore its return value.
* The `riak_kv_wterl_backend` module is currently designed to rely on the
fact that it runs in just a single Erlang scheduler thread, which is
necessary because WiredTiger doesn't allow a session to be used
concurrently by different threads. If the KV node design ever changes to
involve concurrency across scheduler threads, this current design will no
longer work correctly.
TODO:
* Find/fix any code marked "TODO:"
* Why do we see {error, {eperm, _}} result on wterl:cursor_close/1 during
fold_objects/4?
* Why do we see {error, {eperm, _}} result on wterl:cursor_close/1?
* Why do we see {error, {eperm, _}} result on wterl:cursor_next/1 during
is_empty/1?
* Why do we see {error, {eperm, _}} result on wterl:cursor_next_value/1
during status/1?
* Why do we see {error, {ebusy, _}} result on wterl:drop/2?
* Determine a better way to estimate the number of sessions we should
configure WT for at startup in riak_kv_wterl_backend:max_sessions/1.
* Make sure Erlang is optimizing for selective receive in async_nif_enqueue/3
because in the eLevelDB driver there is a comment: "This cannot be a separate
function. Code must be inline to trigger Erlang compiler's use of optimized
selective receive."
* Provide a way to configure the cursor options, right now they are
always "raw,overwrite".
* Add support for Riak/KV 2i indexes using the same design pattern
as eLevelDB (in a future version consider alternate schema)
* If an operation using a shared cursor results in a non-normal error
then it should be closed/discarded from the recycled pool
* Cache cursors based on hash(table/config) rather than just table.
* Finish NIF unload/reload functions and test.
* Test an upgrade, include a format/schema/WT change.
* When WT_PANIC is returned first try to unload/reload then driver
and reset all state, if that fails then exit gracefully.
* Currently the `riak_kv_wterl_backend` module is stored in this
repository, but it really belongs in the `riak_kv` repository.
* There are currently some stability issues with WiredTiger that can
sometimes cause errors when restarting KV nodes with non-empty WiredTiger
storage.
* wterl:truncate/5 can segv, and its tests are commented out
* Add async_nif and wterl NIF stats to the results provided by the
stats API
* Longer term ideas/changes to consider:
* More testing, especially pulse/qc
* Riak/KV integration
* Store 2i indexes in separate tables
* Store buckets, in separate tables and keep a <<bucket/key>> index
to ensure that folds across a vnode are easy
* Provide a drop bucket API call
* Support key expirey
* An ets API (like the LevelDB's lets project)
* Use mime-type to inform WT's schema for key value encoding
* Other use cases within Riak
* An AAE driver using WT
* An ability to store the ring file via WT
Future support for secondary indexes requires WiredTiger features that are
under development but are not yet available.
Deploying
---------

609
c_src/async_nif.h Normal file
View file

@ -0,0 +1,609 @@
/*
* async_nif: An async thread-pool layer for Erlang's NIF API
*
* Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
* Author: Gregory Burd <greg@basho.com> <greg@burd.me>
*
* This file is provided to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
#ifndef __ASYNC_NIF_H__
#define __ASYNC_NIF_H__
#if defined(__cplusplus)
extern "C" {
#endif
#include <assert.h>
#include "queue.h"
#ifndef UNUSED
#define UNUSED(v) ((void)(v))
#endif
#define ASYNC_NIF_MAX_WORKERS 1024
#define ASYNC_NIF_MIN_WORKERS 2
#define ASYNC_NIF_WORKER_QUEUE_SIZE 8192
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
/* Atoms (initialized in on_load) */
static ERL_NIF_TERM ATOM_EAGAIN;
static ERL_NIF_TERM ATOM_ENOMEM;
static ERL_NIF_TERM ATOM_ENQUEUED;
static ERL_NIF_TERM ATOM_ERROR;
static ERL_NIF_TERM ATOM_OK;
static ERL_NIF_TERM ATOM_SHUTDOWN;
struct async_nif_req_entry {
ERL_NIF_TERM ref;
ErlNifEnv *env;
ErlNifPid pid;
void *args;
void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *);
void (*fn_post)(void *);
STAILQ_ENTRY(async_nif_req_entry) entries;
};
struct async_nif_work_queue {
unsigned int num_workers;
unsigned int depth;
ErlNifMutex *reqs_mutex;
ErlNifCond *reqs_cnd;
struct async_nif_work_queue *next;
STAILQ_HEAD(reqs, async_nif_req_entry) reqs;
};
struct async_nif_worker_entry {
ErlNifTid tid;
unsigned int worker_id;
struct async_nif_state *async_nif;
struct async_nif_work_queue *q;
SLIST_ENTRY(async_nif_worker_entry) entries;
};
struct async_nif_state {
unsigned int shutdown;
ErlNifMutex *we_mutex;
unsigned int we_active;
SLIST_HEAD(joining, async_nif_worker_entry) we_joining;
unsigned int num_queues;
unsigned int next_q;
STAILQ_HEAD(recycled_reqs, async_nif_req_entry) recycled_reqs;
unsigned int num_reqs;
ErlNifMutex *recycled_req_mutex;
struct async_nif_work_queue queues[];
};
#define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \
struct decl ## _args frame; \
static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl ## _args *args) { \
UNUSED(worker_id); \
DPRINTF("async_nif: calling \"%s\"", __func__); \
do work_block while(0); \
DPRINTF("async_nif: returned from \"%s\"", __func__); \
} \
static void fn_post_ ## decl (struct decl ## _args *args) { \
UNUSED(args); \
DPRINTF("async_nif: calling \"fn_post_%s\"", #decl); \
do post_block while(0); \
DPRINTF("async_nif: returned from \"fn_post_%s\"", #decl); \
} \
static ERL_NIF_TERM decl(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv_in[]) { \
struct decl ## _args on_stack_args; \
struct decl ## _args *args = &on_stack_args; \
struct decl ## _args *copy_of_args; \
struct async_nif_req_entry *req = NULL; \
unsigned int affinity = 0; \
ErlNifEnv *new_env = NULL; \
/* argv[0] is a ref used for selective recv */ \
const ERL_NIF_TERM *argv = argv_in + 1; \
argc -= 1; \
/* Note: !!! this assumes that the first element of priv_data is ours */ \
struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \
if (async_nif->shutdown) \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_SHUTDOWN); \
req = async_nif_reuse_req(async_nif); \
if (!req) \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_ENOMEM); \
new_env = req->env; \
DPRINTF("async_nif: calling \"%s\"", __func__); \
do pre_block while(0); \
DPRINTF("async_nif: returned from \"%s\"", __func__); \
copy_of_args = (struct decl ## _args *)malloc(sizeof(struct decl ## _args)); \
if (!copy_of_args) { \
fn_post_ ## decl (args); \
async_nif_recycle_req(req, async_nif); \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_ENOMEM); \
} \
memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \
req->ref = enif_make_copy(new_env, argv_in[0]); \
enif_self(env, &req->pid); \
req->args = (void*)copy_of_args; \
req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \
req->fn_post = (void (*)(void *))fn_post_ ## decl; \
int h = -1; \
if (affinity) \
h = ((unsigned int)affinity) % async_nif->num_queues; \
ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \
if (!reply) { \
fn_post_ ## decl (args); \
async_nif_recycle_req(req, async_nif); \
free(copy_of_args); \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_EAGAIN); \
} \
return reply; \
}
#define ASYNC_NIF_INIT(name) \
static ErlNifMutex *name##_async_nif_coord = NULL;
#define ASYNC_NIF_LOAD(name, env, priv) do { \
if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create("nif_coord load"); \
enif_mutex_lock(name##_async_nif_coord); \
priv = async_nif_load(env); \
enif_mutex_unlock(name##_async_nif_coord); \
} while(0);
#define ASYNC_NIF_UNLOAD(name, env, priv) do { \
if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create("nif_coord unload"); \
enif_mutex_lock(name##_async_nif_coord); \
async_nif_unload(env, priv); \
enif_mutex_unlock(name##_async_nif_coord); \
enif_mutex_destroy(name##_async_nif_coord); \
name##_async_nif_coord = NULL; \
} while(0);
#define ASYNC_NIF_UPGRADE(name, env) do { \
if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create("nif_coord upgrade"); \
enif_mutex_lock(name##_async_nif_coord); \
async_nif_upgrade(env); \
enif_mutex_unlock(name##_async_nif_coord); \
} while(0);
#define ASYNC_NIF_RETURN_BADARG() do { \
async_nif_recycle_req(req, async_nif); \
return enif_make_badarg(env); \
} while(0);
#define ASYNC_NIF_WORK_ENV new_env
#define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg))
/**
* Return a request structure from the recycled req queue if one exists,
* otherwise create one.
*/
struct async_nif_req_entry *
async_nif_reuse_req(struct async_nif_state *async_nif)
{
struct async_nif_req_entry *req = NULL;
ErlNifEnv *env = NULL;
enif_mutex_lock(async_nif->recycled_req_mutex);
if (STAILQ_EMPTY(&async_nif->recycled_reqs)) {
if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) {
req = malloc(sizeof(struct async_nif_req_entry));
if (req) {
memset(req, 0, sizeof(struct async_nif_req_entry));
env = enif_alloc_env();
if (env) {
req->env = env;
__sync_fetch_and_add(&async_nif->num_reqs, 1);
} else {
free(req);
req = NULL;
}
}
}
} else {
req = STAILQ_FIRST(&async_nif->recycled_reqs);
STAILQ_REMOVE(&async_nif->recycled_reqs, req, async_nif_req_entry, entries);
}
enif_mutex_unlock(async_nif->recycled_req_mutex);
return req;
}
/**
* Store the request for future re-use.
*
* req a request entry with an ErlNifEnv* which will be cleared
* before reuse, but not until then.
* async_nif a handle to our state so that we can find and use the mutex
*/
void
async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif)
{
ErlNifEnv *env = NULL;
enif_mutex_lock(async_nif->recycled_req_mutex);
enif_clear_env(req->env);
env = req->env;
memset(req, 0, sizeof(struct async_nif_req_entry));
req->env = env;
STAILQ_INSERT_TAIL(&async_nif->recycled_reqs, req, entries);
enif_mutex_unlock(async_nif->recycled_req_mutex);
}
static void *async_nif_worker_fn(void *);
/**
* Start up a worker thread.
*/
static int
async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_queue *q)
{
struct async_nif_worker_entry *we;
if (0 == q)
return EINVAL;
enif_mutex_lock(async_nif->we_mutex);
we = SLIST_FIRST(&async_nif->we_joining);
while(we != NULL) {
struct async_nif_worker_entry *n = SLIST_NEXT(we, entries);
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(we->tid, &exit_value);
free(we);
async_nif->we_active--;
we = n;
}
if (async_nif->we_active == ASYNC_NIF_MAX_WORKERS) {
enif_mutex_unlock(async_nif->we_mutex);
return EAGAIN;
}
we = malloc(sizeof(struct async_nif_worker_entry));
if (!we) {
enif_mutex_unlock(async_nif->we_mutex);
return ENOMEM;
}
memset(we, 0, sizeof(struct async_nif_worker_entry));
we->worker_id = async_nif->we_active++;
we->async_nif = async_nif;
we->q = q;
enif_mutex_unlock(async_nif->we_mutex);
return enif_thread_create(NULL,&we->tid, &async_nif_worker_fn, (void*)we, 0);
}
/**
* Enqueue a request for processing by a worker thread.
*
* Places the request into a work queue determined either by the
* provided affinity or by iterating through the available queues.
*/
static ERL_NIF_TERM
async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint)
{
/* Identify the most appropriate worker for this request. */
unsigned int i, last_qid, qid = 0;
struct async_nif_work_queue *q = NULL;
double avg_depth = 0.0;
/* Either we're choosing a queue based on some affinity/hinted value or we
need to select the next queue in the rotation and atomically update that
global value (next_q is shared across worker threads) . */
if (hint >= 0) {
qid = (unsigned int)hint;
} else {
do {
last_qid = __sync_fetch_and_add(&async_nif->next_q, 0);
qid = (last_qid + 1) % async_nif->num_queues;
} while (!__sync_bool_compare_and_swap(&async_nif->next_q, last_qid, qid));
}
/* Now we inspect and interate across the set of queues trying to select one
that isn't too full or too slow. */
for (i = 0; i < async_nif->num_queues; i++) {
/* Compute the average queue depth not counting queues which are empty or
the queue we're considering right now. */
unsigned int j, n = 0;
for (j = 0; j < async_nif->num_queues; j++) {
if (j != qid && async_nif->queues[j].depth != 0) {
n++;
avg_depth += async_nif->queues[j].depth;
}
}
if (avg_depth) avg_depth /= n;
/* Lock this queue under consideration, then check for shutdown. While
we hold this lock either a) we're shutting down so exit now or b) this
queue will be valid until we release the lock. */
q = &async_nif->queues[qid];
enif_mutex_lock(q->reqs_mutex);
/* Try not to enqueue a request into a queue that isn't keeping up with
the request volume. */
if (q->depth <= avg_depth) break;
else {
enif_mutex_unlock(q->reqs_mutex);
qid = (qid + 1) % async_nif->num_queues;
}
}
/* If the for loop finished then we didn't find a suitable queue for this
request, meaning we're backed up so trigger eagain. Note that if we left
the loop in this way we hold no lock. */
if (i == async_nif->num_queues) return 0;
/* Add the request to the queue. */
STAILQ_INSERT_TAIL(&q->reqs, req, entries);
__sync_fetch_and_add(&q->depth, 1);
/* We've selected a queue for this new request now check to make sure there are
enough workers actively processing requests on this queue. */
while (q->depth > q->num_workers) {
switch(async_nif_start_worker(async_nif, q)) {
case EINVAL: case ENOMEM: default: return 0;
case EAGAIN: continue;
case 0: __sync_fetch_and_add(&q->num_workers, 1); goto done;
}
}done:;
/* Build the term before releasing the lock so as not to race on the use of
the req pointer (which will soon become invalid in another thread
performing the request). */
double pct_full = (double)avg_depth / (double)ASYNC_NIF_WORKER_QUEUE_SIZE;
ERL_NIF_TERM reply = enif_make_tuple2(req->env, ATOM_OK,
enif_make_tuple2(req->env, ATOM_ENQUEUED,
enif_make_double(req->env, pct_full)));
enif_cond_signal(q->reqs_cnd);
enif_mutex_unlock(q->reqs_mutex);
return reply;
}
/**
* Worker threads execute this function. Here each worker pulls requests of
* their respective queues, executes that work and continues doing that until
* they see the shutdown flag is set at which point they exit.
*/
static void *
async_nif_worker_fn(void *arg)
{
struct async_nif_worker_entry *we = (struct async_nif_worker_entry *)arg;
unsigned int worker_id = we->worker_id;
struct async_nif_state *async_nif = we->async_nif;
struct async_nif_work_queue *q = we->q;
struct async_nif_req_entry *req = NULL;
unsigned int tries = async_nif->num_queues;
for(;;) {
/* Examine the request queue, are there things to be done? */
enif_mutex_lock(q->reqs_mutex);
check_again_for_work:
if (async_nif->shutdown) {
enif_mutex_unlock(q->reqs_mutex);
break;
}
if (STAILQ_EMPTY(&q->reqs)) {
/* Queue is empty so we wait for more work to arrive. */
enif_mutex_unlock(q->reqs_mutex);
if (tries == 0 && q == we->q) {
if (q->num_workers > ASYNC_NIF_MIN_WORKERS) {
/* At this point we've tried to find/execute work on all queues
* and there are at least MIN_WORKERS on this queue so we
* leaving this loop (break) which leads to a thread exit/join. */
break;
} else {
enif_mutex_lock(q->reqs_mutex);
enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
goto check_again_for_work;
}
} else {
tries--;
__sync_fetch_and_add(&q->num_workers, -1);
q = q->next;
__sync_fetch_and_add(&q->num_workers, 1);
continue; // try next queue
}
} else {
/* At this point the next req is ours to process and we hold the
reqs_mutex lock. Take the request off the queue. */
req = STAILQ_FIRST(&q->reqs);
STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries);
__sync_fetch_and_add(&q->depth, -1);
/* Wake up other worker thread watching this queue to help process work. */
enif_cond_signal(q->reqs_cnd);
enif_mutex_unlock(q->reqs_mutex);
/* Perform the work. */
req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args);
/* Now call the post-work cleanup function. */
req->fn_post(req->args);
/* Clean up req for reuse. */
req->ref = 0;
req->fn_work = 0;
req->fn_post = 0;
free(req->args);
req->args = NULL;
async_nif_recycle_req(req, async_nif);
req = NULL;
}
}
enif_mutex_lock(async_nif->we_mutex);
SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries);
enif_mutex_unlock(async_nif->we_mutex);
__sync_fetch_and_add(&q->num_workers, -1);
enif_thread_exit(0);
return 0;
}
static void
async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
{
unsigned int i;
unsigned int num_queues = async_nif->num_queues;
struct async_nif_work_queue *q = NULL;
struct async_nif_req_entry *req = NULL;
struct async_nif_worker_entry *we = NULL;
UNUSED(env);
/* Signal the worker threads, stop what you're doing and exit. To ensure
that we don't race with the enqueue() process we first lock all the worker
queues, then set shutdown to true, then unlock. The enqueue function will
take the queue mutex, then test for shutdown condition, then enqueue only
if not shutting down. */
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
enif_mutex_lock(q->reqs_mutex);
}
/* Set the shutdown flag so that worker threads will no continue
executing requests. */
async_nif->shutdown = 1;
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
enif_mutex_unlock(q->reqs_mutex);
}
/* Join for the now exiting worker threads. */
while(async_nif->we_active > 0) {
for (i = 0; i < num_queues; i++)
enif_cond_broadcast(async_nif->queues[i].reqs_cnd);
enif_mutex_lock(async_nif->we_mutex);
we = SLIST_FIRST(&async_nif->we_joining);
while(we != NULL) {
struct async_nif_worker_entry *n = SLIST_NEXT(we, entries);
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(we->tid, &exit_value);
free(we);
async_nif->we_active--;
we = n;
}
enif_mutex_unlock(async_nif->we_mutex);
}
enif_mutex_destroy(async_nif->we_mutex);
/* Cleanup in-flight requests, mutexes and conditions in each work queue. */
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
/* Worker threads are stopped, now toss anything left in the queue. */
req = NULL;
req = STAILQ_FIRST(&q->reqs);
while(req != NULL) {
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
enif_clear_env(req->env);
enif_send(NULL, &req->pid, req->env,
enif_make_tuple2(req->env, ATOM_ERROR, ATOM_SHUTDOWN));
req->fn_post(req->args);
enif_free_env(req->env);
free(req->args);
free(req);
req = n;
}
enif_mutex_destroy(q->reqs_mutex);
enif_cond_destroy(q->reqs_cnd);
}
/* Free any req structures sitting unused on the recycle queue. */
enif_mutex_lock(async_nif->recycled_req_mutex);
req = NULL;
req = STAILQ_FIRST(&async_nif->recycled_reqs);
while(req != NULL) {
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
enif_free_env(req->env);
free(req);
req = n;
}
enif_mutex_unlock(async_nif->recycled_req_mutex);
enif_mutex_destroy(async_nif->recycled_req_mutex);
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
free(async_nif);
}
static void *
async_nif_load(ErlNifEnv *env)
{
static int has_init = 0;
unsigned int i, num_queues;
ErlNifSysInfo info;
struct async_nif_state *async_nif;
/* Don't init more than once. */
if (has_init) return 0;
else has_init = 1;
/* Init some static references to commonly used atoms. */
ATOM_EAGAIN = enif_make_atom(env, "eagain");
ATOM_ENOMEM = enif_make_atom(env, "enomem");
ATOM_ENQUEUED = enif_make_atom(env, "enqueued");
ATOM_ERROR = enif_make_atom(env, "error");
ATOM_OK = enif_make_atom(env, "ok");
ATOM_SHUTDOWN = enif_make_atom(env, "shutdown");
/* Find out how many schedulers there are. */
enif_system_info(&info, sizeof(ErlNifSysInfo));
/* Size the number of work queues according to schedulers. */
if (info.scheduler_threads > ASYNC_NIF_MAX_WORKERS / 2) {
num_queues = ASYNC_NIF_MAX_WORKERS / 2;
} else {
int remainder = ASYNC_NIF_MAX_WORKERS % info.scheduler_threads;
if (remainder != 0)
num_queues = info.scheduler_threads - remainder;
else
num_queues = info.scheduler_threads;
if (num_queues < 2)
num_queues = 2;
}
/* Init our portion of priv_data's module-specific state. */
async_nif = malloc(sizeof(struct async_nif_state) +
sizeof(struct async_nif_work_queue) * num_queues);
if (!async_nif)
return NULL;
memset(async_nif, 0, sizeof(struct async_nif_state) +
sizeof(struct async_nif_work_queue) * num_queues);
async_nif->num_queues = num_queues;
async_nif->we_active = 0;
async_nif->next_q = 0;
async_nif->shutdown = 0;
STAILQ_INIT(&async_nif->recycled_reqs);
async_nif->recycled_req_mutex = enif_mutex_create("recycled_req");
async_nif->we_mutex = enif_mutex_create("we");
SLIST_INIT(&async_nif->we_joining);
for (i = 0; i < async_nif->num_queues; i++) {
struct async_nif_work_queue *q = &async_nif->queues[i];
STAILQ_INIT(&q->reqs);
q->reqs_mutex = enif_mutex_create("reqs");
q->reqs_cnd = enif_cond_create("reqs");
q->next = &async_nif->queues[(i + 1) % num_queues];
}
return async_nif;
}
static void
async_nif_upgrade(ErlNifEnv *env)
{
UNUSED(env);
// TODO:
}
#if defined(__cplusplus)
}
#endif
#endif // __ASYNC_NIF_H__

View file

@ -1,28 +1,152 @@
#!/bin/bash
# /bin/sh on Solaris is not a POSIX compatible shell, but /usr/bin/ksh is.
if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then
POSIX_SHELL="true"
export POSIX_SHELL
exec /usr/bin/ksh $0 $@
fi
unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as well
set -e
WT_VSN=1.4.2
WT_REPO=http://github.com/wiredtiger/wiredtiger.git
WT_BRANCH=develop
WT_DIR=wiredtiger-`basename $WT_BRANCH`
#WT_REF="tags/1.6.6"
#WT_DIR=wiredtiger-`basename $WT_REF`
SNAPPY_VSN="1.0.4"
SNAPPY_DIR=snappy-$SNAPPY_VSN
[ `basename $PWD` != "c_src" ] && cd c_src
BASEDIR="$PWD"
export BASEDIR="$PWD"
which gmake 1>/dev/null 2>/dev/null && MAKE=gmake
MAKE=${MAKE:-make}
export CPPFLAGS="$CPPLAGS -I $BASEDIR/system/include -O3 -mtune=native -march=native"
export LDFLAGS="$LDFLAGS -L$BASEDIR/system/lib"
export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$BASEDIR/system/lib:$LD_LIBRARY_PATH"
get_wt ()
{
if [ -d $BASEDIR/$WT_DIR/.git ]; then
(cd $BASEDIR/$WT_DIR && git pull -u) || exit 1
else
if [ "X$WT_REF" != "X" ]; then
git clone ${WT_REPO} ${WT_DIR} && \
(cd $BASEDIR/$WT_DIR && git checkout refs/$WT_REF || exit 1)
else
git clone ${WT_REPO} ${WT_DIR} && \
(cd $BASEDIR/$WT_DIR && git checkout -b $WT_BRANCH origin/$WT_BRANCH || exit 1)
fi
fi
[ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1)
(cd $BASEDIR/$WT_DIR
[ -e $BASEDIR/wiredtiger-build.patch ] && \
(patch -p1 --forward < $BASEDIR/wiredtiger-build.patch || exit 1 )
./autogen.sh || exit 1
[ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \
(cd $BASEDIR/$WT_DIR/build_posix && $MAKE distclean)
wt_configure;
)
}
wt_configure ()
{
(cd $BASEDIR/$WT_DIR/build_posix
CFLAGS+=-g $BASEDIR/$WT_DIR/configure --with-pic \
--enable-snappy \
--prefix=${BASEDIR}/system || exit 1)
}
get_snappy ()
{
[ -e snappy-$SNAPPY_VSN.tar.gz ] || (echo "Missing Snappy ($SNAPPY_VSN) source package" && exit 1)
[ -d $BASEDIR/$SNAPPY_DIR ] || tar -xzf snappy-$SNAPPY_VSN.tar.gz
[ -e $BASEDIR/snappy-build.patch ] && \
(cd $BASEDIR/$SNAPPY_DIR
patch -p1 --forward < $BASEDIR/snappy-build.patch || exit 1)
(cd $BASEDIR/$SNAPPY_DIR
./configure --with-pic --prefix=$BASEDIR/system || exit 1)
}
get_deps ()
{
get_snappy;
get_wt;
}
update_deps ()
{
if [ -d $BASEDIR/$WT_DIR/.git ]; then
(cd $BASEDIR/$WT_DIR
if [ "X$WT_VSN" == "X" ]; then
git pull -u || exit 1
else
git checkout $WT_VSN || exit 1
fi
)
fi
}
build_wt ()
{
wt_configure;
(cd $BASEDIR/$WT_DIR/build_posix && $MAKE -j && $MAKE install)
}
build_snappy ()
{
(cd $BASEDIR/$SNAPPY_DIR && \
$MAKE -j && \
$MAKE install
)
}
case "$1" in
clean)
rm -rf system wiredtiger-$WT_VSN
[ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \
(cd $BASEDIR/$WT_DIR/build_posix && $MAKE clean)
rm -rf system $SNAPPY_DIR
rm -f ${BASEDIR}/../priv/wt
rm -f ${BASEDIR}/../priv/libwiredtiger-*.so
rm -f ${BASEDIR}/../priv/libwiredtiger_*.so
rm -f ${BASEDIR}/../priv/libsnappy.so.*
;;
test)
(cd $BASEDIR/$WT_DIR && $MAKE -j test)
;;
update-deps)
update-deps;
;;
get-deps)
get_deps;
;;
*)
test -f system/lib/libwiredtiger.a && exit 0
shopt -s extglob
SUFFIXES='@(so|dylib)'
tar -xjf wiredtiger-$WT_VSN.tar.bz2
(cd wiredtiger-$WT_VSN/build_posix && \
../configure --with-pic \
--prefix=$BASEDIR/system && \
make && make install)
# Build Snappy
[ -d $SNAPPY_DIR ] || get_snappy;
[ -d $BASEDIR/$SNAPPY_DIR ] || (echo "Missing Snappy source directory" && exit 1)
test -f $BASEDIR/system/lib/libsnappy.so.[0-9].[0-9].[0-9].* || build_snappy;
# Build WiredTiger
[ -d $WT_DIR ] || get_wt;
[ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1)
test -f $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].${SUFFIXES} -a \
-f $BASEDIR/system/lib/libwiredtiger_snappy.${SUFFIXES} || build_wt;
[ -d $BASEDIR/../priv ] || mkdir ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/bin/wt ${BASEDIR}/../priv
cp -p -P ${BASEDIR}/system/lib/libwiredtiger-[0-9].[0-9].[0-9].${SUFFIXES} ${BASEDIR}/../priv
cp -p -P ${BASEDIR}/system/lib/libwiredtiger_snappy.${SUFFIXES} ${BASEDIR}/../priv
cp -p -P ${BASEDIR}/system/lib/libsnappy.${SUFFIXES}* ${BASEDIR}/../priv
;;
esac

66
c_src/common.h Normal file
View file

@ -0,0 +1,66 @@
/*
* Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
* Author: Gregory Burd <greg@basho.com> <greg@burd.me>
*
* This file is provided to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef __COMMON_H__
#define __COMMON_H__
#if defined(__cplusplus)
extern "C" {
#endif
#if !(__STDC_VERSION__ >= 199901L || defined(__GNUC__))
# undef DEBUG
# define DEBUG 0
# define DPRINTF (void) /* Vararg macros may be unsupported */
#elif DEBUG
#include <stdio.h>
#include <stdarg.h>
#define DPRINTF(fmt, ...) \
do { \
fprintf(stderr, "%s:%d " fmt "\n", __FILE__, __LINE__, __VA_ARGS__); \
fflush(stderr); \
} while(0)
#define DPUTS(arg) DPRINTF("%s", arg)
#else
#define DPRINTF(fmt, ...) ((void) 0)
#define DPUTS(arg) ((void) 0)
#endif
#ifndef __UNUSED
#define __UNUSED(v) ((void)(v))
#endif
#ifndef COMPQUIET
#define COMPQUIET(n, v) do { \
(n) = (v); \
(n) = (n); \
} while (0)
#endif
#ifdef __APPLE__
#define PRIuint64(x) (x)
#else
#define PRIuint64(x) (unsigned long long)(x)
#endif
#if defined(__cplusplus)
}
#endif
#endif // __COMMON_H__

678
c_src/queue.h Normal file
View file

@ -0,0 +1,678 @@
/*
* Copyright (c) 1991, 1993
* The Regents of the University of California. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 4. Neither the name of the University nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* @(#)queue.h 8.5 (Berkeley) 8/20/94
* $FreeBSD: src/sys/sys/queue.h,v 1.54 2002/08/05 05:18:43 alfred Exp $
*/
#ifndef _DB_QUEUE_H_
#define _DB_QUEUE_H_
#ifndef __offsetof
#define __offsetof(st, m) \
((size_t) ( (char *)&((st *)0)->m - (char *)0 ))
#endif
#ifndef __containerof
#define __containerof(ptr, type, member) ({ \
const typeof( ((type *)0)->member ) *__mptr = (ptr); \
(type *)( (char *)__mptr - __offsetof(type,member) );})
#endif
#if defined(__cplusplus)
extern "C" {
#endif
/*
* This file defines four types of data structures: singly-linked lists,
* singly-linked tail queues, lists and tail queues.
*
* A singly-linked list is headed by a single forward pointer. The elements
* are singly linked for minimum space and pointer manipulation overhead at
* the expense of O(n) removal for arbitrary elements. New elements can be
* added to the list after an existing element or at the head of the list.
* Elements being removed from the head of the list should use the explicit
* macro for this purpose for optimum efficiency. A singly-linked list may
* only be traversed in the forward direction. Singly-linked lists are ideal
* for applications with large datasets and few or no removals or for
* implementing a LIFO queue.
*
* A singly-linked tail queue is headed by a pair of pointers, one to the
* head of the list and the other to the tail of the list. The elements are
* singly linked for minimum space and pointer manipulation overhead at the
* expense of O(n) removal for arbitrary elements. New elements can be added
* to the list after an existing element, at the head of the list, or at the
* end of the list. Elements being removed from the head of the tail queue
* should use the explicit macro for this purpose for optimum efficiency.
* A singly-linked tail queue may only be traversed in the forward direction.
* Singly-linked tail queues are ideal for applications with large datasets
* and few or no removals or for implementing a FIFO queue.
*
* A list is headed by a single forward pointer (or an array of forward
* pointers for a hash table header). The elements are doubly linked
* so that an arbitrary element can be removed without a need to
* traverse the list. New elements can be added to the list before
* or after an existing element or at the head of the list. A list
* may only be traversed in the forward direction.
*
* A tail queue is headed by a pair of pointers, one to the head of the
* list and the other to the tail of the list. The elements are doubly
* linked so that an arbitrary element can be removed without a need to
* traverse the list. New elements can be added to the list before or
* after an existing element, at the head of the list, or at the end of
* the list. A tail queue may be traversed in either direction.
*
* For details on the use of these macros, see the queue(3) manual page.
*
*
* SLIST LIST STAILQ TAILQ
* _HEAD + + + +
* _HEAD_INITIALIZER + + + +
* _ENTRY + + + +
* _INIT + + + +
* _EMPTY + + + +
* _FIRST + + + +
* _NEXT + + + +
* _PREV - - - +
* _LAST - - + +
* _FOREACH + + + +
* _FOREACH_REVERSE - - - +
* _INSERT_HEAD + + + +
* _INSERT_BEFORE - + - +
* _INSERT_AFTER + + + +
* _INSERT_TAIL - - + +
* _CONCAT - - + +
* _REMOVE_HEAD + - + -
* _REMOVE + + + +
*
*/
/*
* XXX
* We #undef all of the macros because there are incompatible versions of this
* file and these macros on various systems. What makes the problem worse is
* they are included and/or defined by system include files which we may have
* already loaded into Berkeley DB before getting here. For example, FreeBSD's
* <rpc/rpc.h> includes its system <sys/queue.h>, and VxWorks UnixLib.h defines
* several of the LIST_XXX macros. Visual C.NET 7.0 also defines some of these
* same macros in Vc7\PlatformSDK\Include\WinNT.h. Make sure we use ours.
*/
#undef LIST_EMPTY
#undef LIST_ENTRY
#undef LIST_FIRST
#undef LIST_FOREACH
#undef LIST_HEAD
#undef LIST_HEAD_INITIALIZER
#undef LIST_INIT
#undef LIST_INSERT_AFTER
#undef LIST_INSERT_BEFORE
#undef LIST_INSERT_HEAD
#undef LIST_NEXT
#undef LIST_REMOVE
#undef QMD_TRACE_ELEM
#undef QMD_TRACE_HEAD
#undef QUEUE_MACRO_DEBUG
#undef SLIST_EMPTY
#undef SLIST_ENTRY
#undef SLIST_FIRST
#undef SLIST_FOREACH
#undef SLIST_FOREACH_PREVPTR
#undef SLIST_HEAD
#undef SLIST_HEAD_INITIALIZER
#undef SLIST_INIT
#undef SLIST_INSERT_AFTER
#undef SLIST_INSERT_HEAD
#undef SLIST_NEXT
#undef SLIST_REMOVE
#undef SLIST_REMOVE_HEAD
#undef STAILQ_CONCAT
#undef STAILQ_EMPTY
#undef STAILQ_ENTRY
#undef STAILQ_FIRST
#undef STAILQ_FOREACH
#undef STAILQ_HEAD
#undef STAILQ_HEAD_INITIALIZER
#undef STAILQ_INIT
#undef STAILQ_INSERT_AFTER
#undef STAILQ_INSERT_HEAD
#undef STAILQ_INSERT_TAIL
#undef STAILQ_LAST
#undef STAILQ_NEXT
#undef STAILQ_REMOVE
#undef STAILQ_REMOVE_HEAD
#undef STAILQ_REMOVE_HEAD_UNTIL
#undef TAILQ_CONCAT
#undef TAILQ_EMPTY
#undef TAILQ_ENTRY
#undef TAILQ_FIRST
#undef TAILQ_FOREACH
#undef TAILQ_FOREACH_REVERSE
#undef TAILQ_HEAD
#undef TAILQ_HEAD_INITIALIZER
#undef TAILQ_INIT
#undef TAILQ_INSERT_AFTER
#undef TAILQ_INSERT_BEFORE
#undef TAILQ_INSERT_HEAD
#undef TAILQ_INSERT_TAIL
#undef TAILQ_LAST
#undef TAILQ_NEXT
#undef TAILQ_PREV
#undef TAILQ_REMOVE
#undef TRACEBUF
#undef TRASHIT
#define QUEUE_MACRO_DEBUG 0
#if QUEUE_MACRO_DEBUG
/* Store the last 2 places the queue element or head was altered */
struct qm_trace {
char * lastfile;
int lastline;
char * prevfile;
int prevline;
};
#define TRACEBUF struct qm_trace trace;
#define TRASHIT(x) do {(x) = (void *)-1;} while (0)
#define QMD_TRACE_HEAD(head) do { \
(head)->trace.prevline = (head)->trace.lastline; \
(head)->trace.prevfile = (head)->trace.lastfile; \
(head)->trace.lastline = __LINE__; \
(head)->trace.lastfile = __FILE__; \
} while (0)
#define QMD_TRACE_ELEM(elem) do { \
(elem)->trace.prevline = (elem)->trace.lastline; \
(elem)->trace.prevfile = (elem)->trace.lastfile; \
(elem)->trace.lastline = __LINE__; \
(elem)->trace.lastfile = __FILE__; \
} while (0)
#else
#define QMD_TRACE_ELEM(elem)
#define QMD_TRACE_HEAD(head)
#define TRACEBUF
#define TRASHIT(x)
#endif /* QUEUE_MACRO_DEBUG */
/*
* Singly-linked List declarations.
*/
#define SLIST_HEAD(name, type) \
struct name { \
struct type *slh_first; /* first element */ \
}
#define SLIST_HEAD_INITIALIZER(head) \
{ NULL }
#define SLIST_ENTRY(type) \
struct { \
struct type *sle_next; /* next element */ \
}
/*
* Singly-linked List functions.
*/
#define SLIST_EMPTY(head) ((head)->slh_first == NULL)
#define SLIST_FIRST(head) ((head)->slh_first)
#define SLIST_FOREACH(var, head, field) \
for ((var) = SLIST_FIRST((head)); \
(var); \
(var) = SLIST_NEXT((var), field))
#define SLIST_FOREACH_PREVPTR(var, varp, head, field) \
for ((varp) = &SLIST_FIRST((head)); \
((var) = *(varp)) != NULL; \
(varp) = &SLIST_NEXT((var), field))
#define SLIST_INIT(head) do { \
SLIST_FIRST((head)) = NULL; \
} while (0)
#define SLIST_INSERT_AFTER(slistelm, elm, field) do { \
SLIST_NEXT((elm), field) = SLIST_NEXT((slistelm), field); \
SLIST_NEXT((slistelm), field) = (elm); \
} while (0)
#define SLIST_INSERT_HEAD(head, elm, field) do { \
SLIST_NEXT((elm), field) = SLIST_FIRST((head)); \
SLIST_FIRST((head)) = (elm); \
} while (0)
#define SLIST_NEXT(elm, field) ((elm)->field.sle_next)
#define SLIST_REMOVE(head, elm, type, field) do { \
if (SLIST_FIRST((head)) == (elm)) { \
SLIST_REMOVE_HEAD((head), field); \
} \
else { \
struct type *curelm = SLIST_FIRST((head)); \
while (SLIST_NEXT(curelm, field) != (elm)) \
curelm = SLIST_NEXT(curelm, field); \
SLIST_NEXT(curelm, field) = \
SLIST_NEXT(SLIST_NEXT(curelm, field), field); \
} \
} while (0)
#define SLIST_REMOVE_HEAD(head, field) do { \
SLIST_FIRST((head)) = SLIST_NEXT(SLIST_FIRST((head)), field); \
} while (0)
/*
* Singly-linked Tail queue declarations.
*/
#define STAILQ_HEAD(name, type) \
struct name { \
struct type *stqh_first;/* first element */ \
struct type **stqh_last;/* addr of last next element */ \
}
#define STAILQ_HEAD_INITIALIZER(head) \
{ NULL, &(head).stqh_first }
#define STAILQ_ENTRY(type) \
struct { \
struct type *stqe_next; /* next element */ \
}
/*
* Singly-linked Tail queue functions.
*/
#define STAILQ_CONCAT(head1, head2) do { \
if (!STAILQ_EMPTY((head2))) { \
*(head1)->stqh_last = (head2)->stqh_first; \
(head1)->stqh_last = (head2)->stqh_last; \
STAILQ_INIT((head2)); \
} \
} while (0)
#define STAILQ_EMPTY(head) ((head)->stqh_first == NULL)
#define STAILQ_FIRST(head) ((head)->stqh_first)
#define STAILQ_FOREACH(var, head, field) \
for ((var) = STAILQ_FIRST((head)); \
(var); \
(var) = STAILQ_NEXT((var), field))
#define STAILQ_INIT(head) do { \
STAILQ_FIRST((head)) = NULL; \
(head)->stqh_last = &STAILQ_FIRST((head)); \
} while (0)
#define STAILQ_INSERT_AFTER(head, tqelm, elm, field) do { \
if ((STAILQ_NEXT((elm), field) = STAILQ_NEXT((tqelm), field)) == NULL)\
(head)->stqh_last = &STAILQ_NEXT((elm), field); \
STAILQ_NEXT((tqelm), field) = (elm); \
} while (0)
#define STAILQ_INSERT_HEAD(head, elm, field) do { \
if ((STAILQ_NEXT((elm), field) = STAILQ_FIRST((head))) == NULL) \
(head)->stqh_last = &STAILQ_NEXT((elm), field); \
STAILQ_FIRST((head)) = (elm); \
} while (0)
#define STAILQ_INSERT_TAIL(head, elm, field) do { \
STAILQ_NEXT((elm), field) = NULL; \
*(head)->stqh_last = (elm); \
(head)->stqh_last = &STAILQ_NEXT((elm), field); \
} while (0)
#define STAILQ_LAST(head, type, field) \
(STAILQ_EMPTY((head)) ? \
NULL : \
((struct type *) \
((char *)((head)->stqh_last) - __offsetof(struct type, field))))
#define STAILQ_NEXT(elm, field) ((elm)->field.stqe_next)
#define STAILQ_REMOVE(head, elm, type, field) do { \
if (STAILQ_FIRST((head)) == (elm)) { \
STAILQ_REMOVE_HEAD((head), field); \
} \
else { \
struct type *curelm = STAILQ_FIRST((head)); \
while (STAILQ_NEXT(curelm, field) != (elm)) \
curelm = STAILQ_NEXT(curelm, field); \
if ((STAILQ_NEXT(curelm, field) = \
STAILQ_NEXT(STAILQ_NEXT(curelm, field), field)) == NULL)\
(head)->stqh_last = &STAILQ_NEXT((curelm), field);\
} \
} while (0)
#define STAILQ_REMOVE_HEAD(head, field) do { \
if ((STAILQ_FIRST((head)) = \
STAILQ_NEXT(STAILQ_FIRST((head)), field)) == NULL) \
(head)->stqh_last = &STAILQ_FIRST((head)); \
} while (0)
#define STAILQ_REMOVE_HEAD_UNTIL(head, elm, field) do { \
if ((STAILQ_FIRST((head)) = STAILQ_NEXT((elm), field)) == NULL) \
(head)->stqh_last = &STAILQ_FIRST((head)); \
} while (0)
/*
* List declarations.
*/
#define LIST_HEAD(name, type) \
struct name { \
struct type *lh_first; /* first element */ \
}
#define LIST_HEAD_INITIALIZER(head) \
{ NULL }
#define LIST_ENTRY(type) \
struct { \
struct type *le_next; /* next element */ \
struct type **le_prev; /* address of previous next element */ \
}
/*
* List functions.
*/
#define LIST_EMPTY(head) ((head)->lh_first == NULL)
#define LIST_FIRST(head) ((head)->lh_first)
#define LIST_FOREACH(var, head, field) \
for ((var) = LIST_FIRST((head)); \
(var); \
(var) = LIST_NEXT((var), field))
#define LIST_INIT(head) do { \
LIST_FIRST((head)) = NULL; \
} while (0)
#define LIST_INSERT_AFTER(listelm, elm, field) do { \
if ((LIST_NEXT((elm), field) = LIST_NEXT((listelm), field)) != NULL)\
LIST_NEXT((listelm), field)->field.le_prev = \
&LIST_NEXT((elm), field); \
LIST_NEXT((listelm), field) = (elm); \
(elm)->field.le_prev = &LIST_NEXT((listelm), field); \
} while (0)
#define LIST_INSERT_BEFORE(listelm, elm, field) do { \
(elm)->field.le_prev = (listelm)->field.le_prev; \
LIST_NEXT((elm), field) = (listelm); \
*(listelm)->field.le_prev = (elm); \
(listelm)->field.le_prev = &LIST_NEXT((elm), field); \
} while (0)
#define LIST_INSERT_HEAD(head, elm, field) do { \
if ((LIST_NEXT((elm), field) = LIST_FIRST((head))) != NULL) \
LIST_FIRST((head))->field.le_prev = &LIST_NEXT((elm), field);\
LIST_FIRST((head)) = (elm); \
(elm)->field.le_prev = &LIST_FIRST((head)); \
} while (0)
#define LIST_NEXT(elm, field) ((elm)->field.le_next)
#define LIST_REMOVE(elm, field) do { \
if (LIST_NEXT((elm), field) != NULL) \
LIST_NEXT((elm), field)->field.le_prev = \
(elm)->field.le_prev; \
*(elm)->field.le_prev = LIST_NEXT((elm), field); \
} while (0)
/*
* Tail queue declarations.
*/
#define TAILQ_HEAD(name, type) \
struct name { \
struct type *tqh_first; /* first element */ \
struct type **tqh_last; /* addr of last next element */ \
TRACEBUF \
}
#define TAILQ_HEAD_INITIALIZER(head) \
{ NULL, &(head).tqh_first }
#define TAILQ_ENTRY(type) \
struct { \
struct type *tqe_next; /* next element */ \
struct type **tqe_prev; /* address of previous next element */ \
TRACEBUF \
}
/*
* Tail queue functions.
*/
#define TAILQ_CONCAT(head1, head2, field) do { \
if (!TAILQ_EMPTY(head2)) { \
*(head1)->tqh_last = (head2)->tqh_first; \
(head2)->tqh_first->field.tqe_prev = (head1)->tqh_last; \
(head1)->tqh_last = (head2)->tqh_last; \
TAILQ_INIT((head2)); \
QMD_TRACE_HEAD(head); \
QMD_TRACE_HEAD(head2); \
} \
} while (0)
#define TAILQ_EMPTY(head) ((head)->tqh_first == NULL)
#define TAILQ_FIRST(head) ((head)->tqh_first)
#define TAILQ_FOREACH(var, head, field) \
for ((var) = TAILQ_FIRST((head)); \
(var); \
(var) = TAILQ_NEXT((var), field))
#define TAILQ_FOREACH_REVERSE(var, head, headname, field) \
for ((var) = TAILQ_LAST((head), headname); \
(var); \
(var) = TAILQ_PREV((var), headname, field))
#define TAILQ_INIT(head) do { \
TAILQ_FIRST((head)) = NULL; \
(head)->tqh_last = &TAILQ_FIRST((head)); \
QMD_TRACE_HEAD(head); \
} while (0)
#define TAILQ_INSERT_AFTER(head, listelm, elm, field) do { \
if ((TAILQ_NEXT((elm), field) = TAILQ_NEXT((listelm), field)) != NULL)\
TAILQ_NEXT((elm), field)->field.tqe_prev = \
&TAILQ_NEXT((elm), field); \
else { \
(head)->tqh_last = &TAILQ_NEXT((elm), field); \
QMD_TRACE_HEAD(head); \
} \
TAILQ_NEXT((listelm), field) = (elm); \
(elm)->field.tqe_prev = &TAILQ_NEXT((listelm), field); \
QMD_TRACE_ELEM(&(elm)->field); \
QMD_TRACE_ELEM(&listelm->field); \
} while (0)
#define TAILQ_INSERT_BEFORE(listelm, elm, field) do { \
(elm)->field.tqe_prev = (listelm)->field.tqe_prev; \
TAILQ_NEXT((elm), field) = (listelm); \
*(listelm)->field.tqe_prev = (elm); \
(listelm)->field.tqe_prev = &TAILQ_NEXT((elm), field); \
QMD_TRACE_ELEM(&(elm)->field); \
QMD_TRACE_ELEM(&listelm->field); \
} while (0)
#define TAILQ_INSERT_HEAD(head, elm, field) do { \
if ((TAILQ_NEXT((elm), field) = TAILQ_FIRST((head))) != NULL) \
TAILQ_FIRST((head))->field.tqe_prev = \
&TAILQ_NEXT((elm), field); \
else \
(head)->tqh_last = &TAILQ_NEXT((elm), field); \
TAILQ_FIRST((head)) = (elm); \
(elm)->field.tqe_prev = &TAILQ_FIRST((head)); \
QMD_TRACE_HEAD(head); \
QMD_TRACE_ELEM(&(elm)->field); \
} while (0)
#define TAILQ_INSERT_TAIL(head, elm, field) do { \
TAILQ_NEXT((elm), field) = NULL; \
(elm)->field.tqe_prev = (head)->tqh_last; \
*(head)->tqh_last = (elm); \
(head)->tqh_last = &TAILQ_NEXT((elm), field); \
QMD_TRACE_HEAD(head); \
QMD_TRACE_ELEM(&(elm)->field); \
} while (0)
#define TAILQ_LAST(head, headname) \
(*(((struct headname *)((head)->tqh_last))->tqh_last))
#define TAILQ_NEXT(elm, field) ((elm)->field.tqe_next)
#define TAILQ_PREV(elm, headname, field) \
(*(((struct headname *)((elm)->field.tqe_prev))->tqh_last))
#define TAILQ_REMOVE(head, elm, field) do { \
if ((TAILQ_NEXT((elm), field)) != NULL) \
TAILQ_NEXT((elm), field)->field.tqe_prev = \
(elm)->field.tqe_prev; \
else { \
(head)->tqh_last = (elm)->field.tqe_prev; \
QMD_TRACE_HEAD(head); \
} \
*(elm)->field.tqe_prev = TAILQ_NEXT((elm), field); \
TRASHIT((elm)->field.tqe_next); \
TRASHIT((elm)->field.tqe_prev); \
QMD_TRACE_ELEM(&(elm)->field); \
} while (0)
/*
* Circular queue definitions.
*/
#define CIRCLEQ_HEAD(name, type) \
struct name { \
struct type *cqh_first; /* first element */ \
struct type *cqh_last; /* last element */ \
}
#define CIRCLEQ_HEAD_INITIALIZER(head) \
{ (void *)&head, (void *)&head }
#define CIRCLEQ_ENTRY(type) \
struct { \
struct type *cqe_next; /* next element */ \
struct type *cqe_prev; /* previous element */ \
}
/*
* Circular queue functions.
*/
#define CIRCLEQ_INIT(head) do { \
(head)->cqh_first = (void *)(head); \
(head)->cqh_last = (void *)(head); \
} while (/*CONSTCOND*/0)
#define CIRCLEQ_INSERT_AFTER(head, listelm, elm, field) do { \
(elm)->field.cqe_next = (listelm)->field.cqe_next; \
(elm)->field.cqe_prev = (listelm); \
if ((listelm)->field.cqe_next == (void *)(head)) \
(head)->cqh_last = (elm); \
else \
(listelm)->field.cqe_next->field.cqe_prev = (elm); \
(listelm)->field.cqe_next = (elm); \
} while (/*CONSTCOND*/0)
#define CIRCLEQ_INSERT_BEFORE(head, listelm, elm, field) do { \
(elm)->field.cqe_next = (listelm); \
(elm)->field.cqe_prev = (listelm)->field.cqe_prev; \
if ((listelm)->field.cqe_prev == (void *)(head)) \
(head)->cqh_first = (elm); \
else \
(listelm)->field.cqe_prev->field.cqe_next = (elm); \
(listelm)->field.cqe_prev = (elm); \
} while (/*CONSTCOND*/0)
#define CIRCLEQ_INSERT_HEAD(head, elm, field) do { \
(elm)->field.cqe_next = (head)->cqh_first; \
(elm)->field.cqe_prev = (void *)(head); \
if ((head)->cqh_last == (void *)(head)) \
(head)->cqh_last = (elm); \
else \
(head)->cqh_first->field.cqe_prev = (elm); \
(head)->cqh_first = (elm); \
} while (/*CONSTCOND*/0)
#define CIRCLEQ_INSERT_TAIL(head, elm, field) do { \
(elm)->field.cqe_next = (void *)(head); \
(elm)->field.cqe_prev = (head)->cqh_last; \
if ((head)->cqh_first == (void *)(head)) \
(head)->cqh_first = (elm); \
else \
(head)->cqh_last->field.cqe_next = (elm); \
(head)->cqh_last = (elm); \
} while (/*CONSTCOND*/0)
#define CIRCLEQ_REMOVE(head, elm, field) do { \
if ((elm)->field.cqe_next == (void *)(head)) \
(head)->cqh_last = (elm)->field.cqe_prev; \
else \
(elm)->field.cqe_next->field.cqe_prev = \
(elm)->field.cqe_prev; \
if ((elm)->field.cqe_prev == (void *)(head)) \
(head)->cqh_first = (elm)->field.cqe_next; \
else \
(elm)->field.cqe_prev->field.cqe_next = \
(elm)->field.cqe_next; \
} while (/*CONSTCOND*/0)
#define CIRCLEQ_FOREACH(var, head, field) \
for ((var) = ((head)->cqh_first); \
(var) != (const void *)(head); \
(var) = ((var)->field.cqe_next))
#define CIRCLEQ_FOREACH_REVERSE(var, head, field) \
for ((var) = ((head)->cqh_last); \
(var) != (const void *)(head); \
(var) = ((var)->field.cqe_prev))
/*
* Circular queue access methods.
*/
#define CIRCLEQ_EMPTY(head) ((head)->cqh_first == (void *)(head))
#define CIRCLEQ_FIRST(head) ((head)->cqh_first)
#define CIRCLEQ_LAST(head) ((head)->cqh_last)
#define CIRCLEQ_NEXT(elm, field) ((elm)->field.cqe_next)
#define CIRCLEQ_PREV(elm, field) ((elm)->field.cqe_prev)
#define CIRCLEQ_LOOP_NEXT(head, elm, field) \
(((elm)->field.cqe_next == (void *)(head)) \
? ((head)->cqh_first) \
: (elm->field.cqe_next))
#define CIRCLEQ_LOOP_PREV(head, elm, field) \
(((elm)->field.cqe_prev == (void *)(head)) \
? ((head)->cqh_last) \
: (elm->field.cqe_prev))
#if defined(__cplusplus)
}
#endif
#endif /* !_DB_QUEUE_H_ */

BIN
c_src/snappy-1.0.4.tar.gz Normal file

Binary file not shown.

Binary file not shown.

View file

@ -0,0 +1,12 @@
diff --git a/ext/compressors/snappy/Makefile.am b/ext/compressors/snappy/Makefile.am
index 6d78823..c423590 100644
--- a/ext/compressors/snappy/Makefile.am
+++ b/ext/compressors/snappy/Makefile.am
@@ -2,5 +2,6 @@ AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include
lib_LTLIBRARIES = libwiredtiger_snappy.la
libwiredtiger_snappy_la_SOURCES = snappy_compress.c
-libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module
+libwiredtiger_snappy_la_CFLAGS = -I$(abs_top_builddir)/../../system/include
+libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module -L$(abs_top_builddir)/../../system/lib -Wl,-rpath,lib/wterl-0.9.0/priv:lib/wterl/priv:priv
libwiredtiger_snappy_la_LIBADD = -lsnappy

File diff suppressed because it is too large Load diff

View file

@ -35,7 +35,7 @@ fi
rebar get-deps
file=./deps/riak_kv/src/riak_kv.app.src
if ! grep -q hanoidb $file && ! grep -q wterl $file ; then
if ! grep -q wterl $file ; then
echo
echo "Modifying $file, saving the original as ${file}.orig ..."
perl -i.orig -pe '/\bos_mon,/ && print qq( wterl,\n)' $file

6
priv/wterl.schema Normal file
View file

@ -0,0 +1,6 @@
%%%% This is the WiredTiger section
%% @doc wiredtiger data_root
{mapping, "wiredtiger.data_root", "wterl.data_root", [
{default, "{{platform_data_dir}}/wiredtiger"}
]}.

View file

@ -1,13 +1,46 @@
%%-*- mode: erlang -*-
%% ex: ft=erlang ts=4 sw=4 et
{require_otp_vsn, "R1[567]"}.
{cover_enabled, true}.
%{eunit_opts, [verbose, {report, {eunit_surefire, [{dir, "."}]}}]}.
{erl_opts, [
%native, {hipe, [o3,verbose]}, inline, {inline_size, 1024},
{parse_transform, lager_transform},
debug_info,
{d,'DEBUG',true},
strict_validation,
fail_on_warning,
%warn_missing_spec,
warn_bif_clash,
warn_deprecated_function,
warn_export_all,
warn_export_vars,
warn_exported_vars,
warn_obsolete_guard,
warn_shadow_vars,
warn_untyped_record,
warn_unused_function,
warn_unused_import,
warn_unused_record,
warn_unused_vars
]}.
{xref_checks, [undefined_function_calls, deprecated_function_calls]}.
{deps, [
{lager, "2.*", {git, "git://github.com/basho/lager", {branch, "master"}}}
]}.
{port_specs, [{"priv/wterl.so", ["c_src/*.c"]}]}.
{port_env, [
{"DRV_CFLAGS", "$DRV_CFLAGS -Werror -I c_src/system/include"},
{"DRV_LDFLAGS", "$DRV_LDFLAGS c_src/system/lib/libwiredtiger.a"}
{"DRV_CFLAGS", "$DRV_CFLAGS -O3 -mtune=native -march=native -fPIC -Wall -Wextra -Werror -I c_src/system/include"},
{"DRV_LDFLAGS", "$DRV_LDFLAGS -Wl,-rpath,lib/wterl/priv:lib/wterl-0.9.0/priv:priv -Lc_src/system/lib -lwiredtiger"}
]}.
{pre_hooks, [{compile, "c_src/build_deps.sh"}]}.
{pre_hooks, [{compile, "c_src/build_deps.sh compile"}]}.
{post_hooks, [{clean, "c_src/build_deps.sh clean"}]}.

54
src/async_nif.hrl Normal file
View file

@ -0,0 +1,54 @@
%% -------------------------------------------------------------------
%%
%% async_nif: An async thread-pool layer for Erlang's NIF API
%%
%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
%% Author: Gregory Burd <greg@basho.com> <greg@burd.me>
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-define(ASYNC_NIF_CALL(Fun, Args),
F = fun(F, T) ->
R = erlang:make_ref(),
case erlang:apply(Fun, [R|Args]) of
{ok, {enqueued, PctBusy}} ->
if
PctBusy > 0.25 andalso PctBusy =< 1.0 ->
erlang:bump_reductions(erlang:trunc(2000 * PctBusy));
true ->
ok
end,
receive
{R, {error, shutdown}=Error} ->
%% Work unit was queued, but not executed.
Error;
{R, {error, _Reason}=Error} ->
%% Work unit returned an error.
Error;
{R, Reply} ->
Reply
end;
{error, eagain} ->
case T of
3 -> not_found;
_ -> F(F, T + 1)
end;
Other ->
Other
end
end,
F(F, 1)).

View file

@ -2,7 +2,7 @@
%%
%% riak_kv_wterl_backend: WiredTiger Driver for Riak
%%
%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
%% Copyright (c) 2012-2013 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
@ -22,7 +22,7 @@
-module(riak_kv_wterl_backend).
-behavior(temp_riak_kv_backend).
-author('Steve Vinoski <steve@basho.com>').
-compile([{parse_transform, lager_transform}]).
%% KV Backend API
-export([api_version/0,
@ -43,6 +43,7 @@
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-endif.
-define(API_VERSION, 1).
@ -50,10 +51,9 @@
%%-define(CAPABILITIES, [async_fold, indexes]).
-define(CAPABILITIES, [async_fold]).
-record(state, {conn :: wterl:connection(),
table :: string(),
session :: wterl:session(),
partition :: integer()}).
-record(state, {table :: string(),
type :: string(),
connection :: wterl:connection()}).
-type state() :: #state{}.
-type config() :: [{atom(), term()}].
@ -81,57 +81,79 @@ capabilities(_, _) ->
%% @doc Start the wterl backend
-spec start(integer(), config()) -> {ok, state()} | {error, term()}.
start(Partition, Config) ->
%% Get the data root directory
case app_helper:get_prop_or_env(data_root, Config, wterl) of
undefined ->
lager:error("Failed to create wterl dir: data_root is not set"),
{error, data_root_unset};
DataRoot ->
AppStart = case application:start(wterl) of
AppStart =
case application:start(wterl) of
ok ->
ok;
{error, {already_started, _}} ->
ok;
{error, Reason} ->
lager:error("Failed to start wterl: ~p", [Reason]),
{error, Reason}
{error, Reason1} ->
lager:error("Failed to start wterl: ~p", [Reason1]),
{error, Reason1}
end,
case AppStart of
ok ->
ok = filelib:ensure_dir(filename:join(DataRoot, "x")),
case wterl_conn:open(DataRoot, Config) of
{ok, ConnRef} ->
Table = "table:wt" ++ integer_to_list(Partition),
{ok, SRef} = wterl:session_open(ConnRef),
ok = wterl:session_create(SRef, Table),
{ok, #state{conn=ConnRef,
table=Table,
session=SRef,
partition=Partition}};
{error, ConnReason}=ConnError ->
lager:error("Failed to start wterl backend: ~p\n",
[ConnReason]),
ConnError
end;
Error ->
Error
Type =
case wterl:config_value(type, Config, "lsm") of
{type, "lsm"} -> "lsm";
{type, "table"} -> "table";
{type, "btree"} -> "table";
{type, BadType} ->
lager:info("wterl:start ignoring unknown type ~p, using lsm instead", [BadType]),
"lsm";
_ ->
lager:info("wterl:start ignoring mistaken setting defaulting to lsm"),
"lsm"
end,
{ok, Connection} = establish_connection(Config, Type),
Table = Type ++ ":" ++ integer_to_list(Partition),
Compressor =
case wterl:config_value(block_compressor, Config, "snappy") of
{block_compressor, "snappy"}=C -> [C];
{block_compressor, "none"} -> [];
{block_compressor, none} -> [];
{block_compressor, _} -> [{block_compressor, "snappy"}];
_ -> [{block_compressor, "snappy"}]
end,
TableOpts =
case Type of
"lsm" ->
[{internal_page_max, "128K"},
{leaf_page_max, "16K"},
{lsm, [
{bloom_config, [{leaf_page_max, "8MB"}]},
{bloom_bit_count, 28},
{bloom_hash_count, 19},
{bloom_oldest, true},
{chunk_size, "100MB"},
{merge_threads, 2}
]}
] ++ Compressor;
"table" ->
Compressor
end,
case wterl:create(Connection, Table, TableOpts) of
ok ->
{ok, #state{table=Table, type=Type,
connection=Connection}};
{error, Reason3} ->
{error, Reason3}
end
end.
%% @doc Stop the wterl backend
-spec stop(state()) -> ok.
stop(#state{conn=ConnRef, session=SRef}) ->
ok = wterl:session_close(SRef),
wterl_conn:close(ConnRef).
stop(_State) ->
ok. %% The connection is closed by wterl_conn:stop()
%% @doc Retrieve an object from the wterl backend
-spec get(riak_object:bucket(), riak_object:key(), state()) ->
{ok, any(), state()} |
{ok, not_found, state()} |
{error, term(), state()}.
get(Bucket, Key, #state{table=Table, session=SRef}=State) ->
get(Bucket, Key, #state{connection=Connection, table=Table}=State) ->
WTKey = to_object_key(Bucket, Key),
case wterl:session_get(SRef, Table, WTKey) of
case wterl:get(Connection, Table, WTKey) of
{ok, Value} ->
{ok, Value, State};
not_found ->
@ -148,9 +170,8 @@ get(Bucket, Key, #state{table=Table, session=SRef}=State) ->
-spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) ->
{ok, state()} |
{error, term(), state()}.
put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{table=Table, session=SRef}=State) ->
WTKey = to_object_key(Bucket, PrimaryKey),
case wterl:session_put(SRef, Table, WTKey, Val) of
put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{connection=Connection, table=Table}=State) ->
case wterl:put(Connection, Table, to_object_key(Bucket, PrimaryKey), Val) of
ok ->
{ok, State};
{error, Reason} ->
@ -164,9 +185,8 @@ put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{table=Table, session=SRef}=Stat
-spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) ->
{ok, state()} |
{error, term(), state()}.
delete(Bucket, Key, _IndexSpecs, #state{table=Table, session=SRef}=State) ->
WTKey = to_object_key(Bucket, Key),
case wterl:session_delete(SRef, Table, WTKey) of
delete(Bucket, Key, _IndexSpecs, #state{connection=Connection, table=Table}=State) ->
case wterl:delete(Connection, Table, to_object_key(Bucket, Key)) of
ok ->
{ok, State};
{error, Reason} ->
@ -178,12 +198,14 @@ delete(Bucket, Key, _IndexSpecs, #state{table=Table, session=SRef}=State) ->
any(),
[],
state()) -> {ok, any()} | {async, fun()}.
fold_buckets(FoldBucketsFun, Acc, Opts, #state{conn=ConnRef, table=Table}) ->
fold_buckets(FoldBucketsFun, Acc, Opts, #state{connection=Connection, table=Table}) ->
FoldFun = fold_buckets_fun(FoldBucketsFun),
BucketFolder =
fun() ->
{ok, SRef} = wterl:session_open(ConnRef),
{ok, Cursor} = wterl:cursor_open(SRef, Table),
case wterl:cursor_open(Connection, Table) of
{error, {enoent, _Message}} ->
Acc;
{ok, Cursor} ->
try
{FoldResult, _} =
wterl:fold_keys(Cursor, FoldFun, {Acc, []}),
@ -192,8 +214,8 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{conn=ConnRef, table=Table}) ->
{break, AccFinal} ->
AccFinal
after
ok = wterl:cursor_close(Cursor),
ok = wterl:session_close(SRef)
ok = wterl:cursor_close(Cursor)
end
end
end,
case lists:member(async_fold, Opts) of
@ -208,7 +230,7 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{conn=ConnRef, table=Table}) ->
any(),
[{atom(), term()}],
state()) -> {ok, term()} | {async, fun()}.
fold_keys(FoldKeysFun, Acc, Opts, #state{conn=ConnRef, table=Table}) ->
fold_keys(FoldKeysFun, Acc, Opts, #state{connection=Connection, table=Table}) ->
%% Figure out how we should limit the fold: by bucket, by
%% secondary index, or neither (fold across everything.)
Bucket = lists:keyfind(bucket, 1, Opts),
@ -225,16 +247,18 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{conn=ConnRef, table=Table}) ->
FoldFun = fold_keys_fun(FoldKeysFun, Limiter),
KeyFolder =
fun() ->
{ok, SRef} = wterl:session_open(ConnRef),
{ok, Cursor} = wterl:cursor_open(SRef, Table),
case wterl:cursor_open(Connection, Table) of
{error, {enoent, _Message}} ->
Acc;
{ok, Cursor} ->
try
wterl:fold_keys(Cursor, FoldFun, Acc)
catch
{break, AccFinal} ->
AccFinal
after
ok = wterl:cursor_close(Cursor),
ok = wterl:session_close(SRef)
ok = wterl:cursor_close(Cursor)
end
end
end,
case lists:member(async_fold, Opts) of
@ -249,21 +273,30 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{conn=ConnRef, table=Table}) ->
any(),
[{atom(), term()}],
state()) -> {ok, any()} | {async, fun()}.
fold_objects(FoldObjectsFun, Acc, Opts, #state{conn=ConnRef, table=Table}) ->
fold_objects(FoldObjectsFun, Acc, Opts, #state{connection=Connection, table=Table}) ->
Bucket = proplists:get_value(bucket, Opts),
FoldFun = fold_objects_fun(FoldObjectsFun, Bucket),
ObjectFolder =
fun() ->
{ok, SRef} = wterl:session_open(ConnRef),
{ok, Cursor} = wterl:cursor_open(SRef, Table),
case wterl:cursor_open(Connection, Table) of
{error, {enoent, _Message}} ->
Acc;
{ok, Cursor} ->
try
wterl:fold(Cursor, FoldFun, Acc)
catch
{break, AccFinal} ->
AccFinal
after
ok = wterl:cursor_close(Cursor),
ok = wterl:session_close(SRef)
case wterl:cursor_close(Cursor) of
ok ->
ok;
{error, {eperm, _}} -> %% TODO: review/fix
ok;
{error, _}=E ->
E
end
end
end
end,
case lists:member(async_fold, Opts) of
@ -275,10 +308,12 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{conn=ConnRef, table=Table}) ->
%% @doc Delete all objects from this wterl backend
-spec drop(state()) -> {ok, state()} | {error, term(), state()}.
drop(#state{table=Table, session=SRef}=State) ->
case wterl:session_truncate(SRef, Table) of
drop(#state{connection=Connection, table=Table}=State) ->
case wterl:drop(Connection, Table) of
ok ->
{ok, State};
{error, {ebusy, _}} -> %% TODO: review/fix
{ok, State};
Error ->
{error, Error, State}
end.
@ -286,24 +321,44 @@ drop(#state{table=Table, session=SRef}=State) ->
%% @doc Returns true if this wterl backend contains any
%% non-tombstone values; otherwise returns false.
-spec is_empty(state()) -> boolean().
is_empty(#state{table=Table, session=SRef}) ->
{ok, Cursor} = wterl:cursor_open(SRef, Table),
try
not_found =:= wterl:cursor_next(Cursor)
after
ok = wterl:cursor_close(Cursor)
is_empty(#state{connection=Connection, table=Table}) ->
case wterl:cursor_open(Connection, Table) of
{ok, Cursor} ->
IsEmpty =
case wterl:cursor_next(Cursor) of
not_found ->
true;
{error, {eperm, _}} ->
false; % TODO: review/fix this logic
_ ->
false
end,
wterl:cursor_close(Cursor),
IsEmpty;
{error, Reason2} ->
{error, Reason2}
end.
%% @doc Get the status information for this wterl backend
-spec status(state()) -> [{atom(), term()}].
status(#state{table=Table, session=SRef}) ->
{ok, Cursor} = wterl:cursor_open(SRef, "statistics:"++Table),
try
Stats = fetch_status(Cursor),
[{stats, Stats}]
after
ok = wterl:cursor_close(Cursor)
end.
status(#state{connection=Connection, table=Table}) ->
[].
%% case wterl:cursor_open(Connection, "statistics:" ++ Table, [{statistics_fast, true}]) of
%% {ok, Cursor} ->
%% TheStats =
%% case fetch_status(Cursor) of
%% {ok, Stats} ->
%% Stats;
%% {error, {eperm, _}} -> % TODO: review/fix this logic
%% {ok, []};
%% _ ->
%% {ok, []}
%% end,
%% wterl:cursor_close(Cursor),
%% TheStats;
%% {error, Reason2} ->
%% {error, Reason2}
%% end.
%% @doc Register an asynchronous callback
-spec callback(reference(), any(), state()) -> {ok, state()}.
@ -315,6 +370,85 @@ callback(_Ref, _Msg, State) ->
%% Internal functions
%% ===================================================================
%% @private
max_sessions(Config) ->
RingSize =
case app_helper:get_prop_or_env(ring_creation_size, Config, riak_core) of
undefined -> 1024;
Size -> Size
end,
Est = RingSize * erlang:system_info(schedulers),
case Est > 8192 of
true ->
8192;
false ->
case Est < 1024 of
true ->
1024;
false ->
Est
end
end.
%% @private
establish_connection(Config, Type) ->
%% Get the data root directory
case app_helper:get_prop_or_env(data_root, Config, wterl) of
undefined ->
lager:error("Failed to create wterl dir: data_root is not set"),
{error, data_root_unset};
DataRoot ->
ok = filelib:ensure_dir(filename:join(DataRoot, "x")),
%% WT Connection Options:
LogSetting = app_helper:get_prop_or_env(log, Config, wterl, false),
CheckpointSetting =
case Type =:= "lsm" of
true ->
case LogSetting of
true ->
%% Turn checkpoints on if logging is on, checkpoints enable log archival.
app_helper:get_prop_or_env(checkpoint, Config, wterl, [{wait, 30}]); % in seconds
_ ->
[]
end;
false ->
app_helper:get_prop_or_env(checkpoint, Config, wterl, [{wait, 30}])
end,
RequestedCacheSize = app_helper:get_prop_or_env(cache_size, Config, wterl),
ConnectionOpts =
orddict:from_list(
[ wterl:config_value(create, Config, true),
wterl:config_value(checkpoint_sync, Config, false),
wterl:config_value(transaction_sync, Config, "none"),
wterl:config_value(log, Config, [{enabled, LogSetting}]),
wterl:config_value(mmap, Config, false),
wterl:config_value(checkpoint, Config, CheckpointSetting),
wterl:config_value(session_max, Config, max_sessions(Config)),
wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)),
wterl:config_value(statistics, Config, [ "fast", "clear"]),
wterl:config_value(statistics_log, Config, [{wait, 600}]), % in seconds
wterl:config_value(verbose, Config, [ "salvage", "verify"
% Note: for some unknown reason, if you add these additional
% verbose flags Erlang SEGV's "size_object: bad tag for 0x80"
% no idea why... you've been warned.
%"block", "shared_cache", "reconcile", "evict", "lsm",
%"fileops", "read", "write", "readserver", "evictserver",
%"hazard", "mutex", "ckpt"
]) ] ++ proplists:get_value(wterl, Config, [])), % sec
%% WT Session Options:
SessionOpts = [{isolation, "snapshot"}],
case wterl_conn:open(DataRoot, ConnectionOpts, SessionOpts) of
{ok, Connection} ->
{ok, Connection};
{error, Reason2} ->
lager:error("Failed to establish a WiredTiger connection, wterl backend unable to start: ~p\n", [Reason2]),
{error, Reason2}
end
end.
%% @private
%% Return a function to fold over the buckets on this backend
fold_buckets_fun(FoldBucketsFun) ->
@ -422,13 +556,53 @@ from_index_key(LKey) ->
%% @private
%% Return all status from wterl statistics cursor
fetch_status(Cursor) ->
fetch_status(Cursor, wterl:cursor_next_value(Cursor), []).
fetch_status(_Cursor, not_found, Acc) ->
lists:reverse(Acc);
fetch_status(Cursor, {ok, Stat}, Acc) ->
[What,Val|_] = [binary_to_list(B) || B <- binary:split(Stat, [<<0>>], [global])],
fetch_status(Cursor, wterl:cursor_next_value(Cursor), [{What,Val}|Acc]).
%% fetch_status(Cursor) ->
%% {ok, fetch_status(Cursor, wterl:cursor_next_value(Cursor), [])}.
%% fetch_status(_Cursor, {error, _}, Acc) ->
%% lists:reverse(Acc);
%% fetch_status(_Cursor, not_found, Acc) ->
%% lists:reverse(Acc);
%% fetch_status(Cursor, {ok, Stat}, Acc) ->
%% [What,Val|_] = [binary_to_list(B) || B <- binary:split(Stat, [<<0>>], [global])],
%% fetch_status(Cursor, wterl:cursor_next_value(Cursor), [{What,Val}|Acc]).
size_cache(RequestedSize) ->
Size =
case RequestedSize of
undefined ->
RunningApps = application:which_applications(),
FinalGuess =
case proplists:is_defined(sasl, RunningApps) andalso
proplists:is_defined(os_mon, RunningApps) of
true ->
Memory = memsup:get_system_memory_data(),
TotalRAM = proplists:get_value(system_total_memory, Memory),
FreeRAM = proplists:get_value(free_memory, Memory),
UsedByBeam = proplists:get_value(total, erlang:memory()),
Target = ((TotalRAM - UsedByBeam) div 3),
FirstGuess = (Target - (Target rem (1024 * 1024))),
SecondGuess =
case FirstGuess > FreeRAM of
true -> FreeRAM - (FreeRAM rem (1024 * 1024));
_ -> FirstGuess
end,
case SecondGuess < 1073741824 of %% < 1GB?
true -> "1GB";
false ->
ThirdGuess = SecondGuess div (1024 * 1024),
integer_to_list(ThirdGuess) ++ "MB"
end;
false ->
"1GB"
end,
application:set_env(wterl, cache_size, FinalGuess),
FinalGuess;
Value when is_list(Value) ->
Value;
Value when is_number(Value) ->
integer_to_list(Value)
end,
Size.
%% ===================================================================
%% EUnit tests
@ -436,12 +610,14 @@ fetch_status(Cursor, {ok, Stat}, Acc) ->
-ifdef(TEST).
simple_test_() ->
?assertCmd("rm -rf test/wterl-backend"),
{ok, CWD} = file:get_cwd(),
rmdir:path(filename:join([CWD, "test/wterl-backend"])), %?assertCmd("rm -rf test/wterl-backend"),
application:set_env(wterl, data_root, "test/wterl-backend"),
temp_riak_kv_backend:standard_test(?MODULE, []).
custom_config_test_() ->
?assertCmd("rm -rf test/wterl-backend"),
{ok, CWD} = file:get_cwd(),
rmdir:path(filename:join([CWD, "test/wterl-backend"])), %?assertCmd("rm -rf test/wterl-backend"),
application:set_env(wterl, data_root, ""),
temp_riak_kv_backend:standard_test(?MODULE, [{data_root, "test/wterl-backend"}]).

26
src/rmdir.erl Normal file
View file

@ -0,0 +1,26 @@
-module(rmdir).
-export([path/1]).
-include_lib("kernel/include/file.hrl").
path(Dir) ->
remove_all_files(".", [Dir]).
remove_all_files(Dir, Files) ->
lists:foreach(fun(File) ->
FilePath = filename:join([Dir, File]),
case file:read_file_info(FilePath) of
{ok, FileInfo} ->
case FileInfo#file_info.type of
directory ->
{ok, DirFiles} = file:list_dir(FilePath),
remove_all_files(FilePath, DirFiles),
file:del_dir(FilePath);
_ ->
file:delete(FilePath)
end;
{error, _Reason} ->
ok
end
end, Files).

View file

@ -272,11 +272,16 @@ empty_check({Backend, State}) ->
}.
setup({BackendMod, Config}) ->
%% Start the backend
application:start(lager),
application:start(sasl),
application:start(os_mon),
{ok, S} = BackendMod:start(42, Config),
{BackendMod, S}.
cleanup({BackendMod, S}) ->
ok = BackendMod:stop(S).
ok = BackendMod:stop(S),
application:stop(lager),
application:stop(sasl),
application:stop(os_mon).
-endif. % TEST

View file

@ -1,6 +1,6 @@
{application, wterl,
[
{description, "Erlang Wrapper for WiredTiger"},
{description, "Erlang NIF Wrapper for WiredTiger"},
{vsn, "0.9.0"},
{registered, []},
{applications, [

File diff suppressed because it is too large Load diff

View file

@ -30,15 +30,13 @@
%% API
-export([start_link/0, stop/0,
open/1, open/2, is_open/0, get/0, close/1]).
open/1, open/2, open/3, is_open/0, get/0, close/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {
conn :: wterl:connection()
}).
-record(state, { conn :: wterl:connection() }).
-type config_list() :: [{atom(), any()}].
@ -55,12 +53,14 @@ stop() ->
gen_server:cast(?MODULE, stop).
-spec open(string()) -> {ok, wterl:connection()} | {error, term()}.
open(Dir) ->
open(Dir, []).
-spec open(string(), config_list()) -> {ok, wterl:connection()} | {error, term()}.
open(Dir, Config) ->
gen_server:call(?MODULE, {open, Dir, Config, self()}, infinity).
-spec open(string(), config_list(), config_list()) -> {ok, wterl:connection()} | {error, term()}.
open(Dir) ->
open(Dir, [], []).
open(Dir, ConnectionConfig) ->
gen_server:call(?MODULE, {open, Dir, ConnectionConfig, [], self()}, infinity).
open(Dir, ConnectionConfig, SessionConfig) ->
gen_server:call(?MODULE, {open, Dir, ConnectionConfig, SessionConfig, self()}, infinity).
-spec is_open() -> boolean().
is_open() ->
@ -82,11 +82,9 @@ init([]) ->
true = wterl_ets:table_ready(),
{ok, #state{}}.
handle_call({open, Dir, Config, Caller}, _From, #state{conn=undefined}=State) ->
Opts = [{create, true},
config_value(cache_size, Config, "100MB"),
config_value(session_max, Config, 100)],
{Reply, NState} = case wterl:conn_open(Dir, wterl:config_to_bin(Opts)) of
handle_call({open, Dir, ConnectionConfig, SessionConfig, Caller}, _From, #state{conn=undefined}=State) ->
{Reply, NState} =
case wterl:connection_open(Dir, ConnectionConfig, SessionConfig) of
{ok, ConnRef}=OK ->
Monitor = erlang:monitor(process, Caller),
true = ets:insert(wterl_ets, {Monitor, Caller}),
@ -95,7 +93,7 @@ handle_call({open, Dir, Config, Caller}, _From, #state{conn=undefined}=State) ->
{Error, State}
end,
{reply, Reply, NState};
handle_call({open, _Dir, _Config, Caller}, _From,#state{conn=ConnRef}=State) ->
handle_call({open, _Dir, _ConnectionConfig, _SessionConfig, Caller}, _From, #state{conn=ConnRef}=State) ->
Monitor = erlang:monitor(process, Caller),
true = ets:insert(wterl_ets, {Monitor, Caller}),
{reply, {ok, ConnRef}, State};
@ -168,11 +166,7 @@ code_change(_OldVsn, State, _Extra) ->
do_close(undefined) ->
ok;
do_close(ConnRef) ->
wterl:conn_close(ConnRef).
%% @private
config_value(Key, Config, Default) ->
{Key, app_helper:get_prop_or_env(Key, Config, wterl, Default)}.
wterl:connection_close(ConnRef).
-ifdef(TEST).
@ -215,14 +209,14 @@ simple_test_() ->
end}]}.
open_one() ->
{ok, Ref} = open("test/wterl-backend", [{session_max, 20},{cache_size, "1MB"}]),
{ok, Ref} = open("test/wterl-backend", [{create, true}, {session_max, 20},{cache_size, "1MB"}]),
true = is_open(),
close(Ref),
false = is_open(),
ok.
open_and_wait(Pid) ->
{ok, Ref} = open("test/wterl-backend"),
{ok, Ref} = open("test/wterl-backend", [{create, true}]),
Pid ! open,
receive
close ->

111
src/wterl_event_handler.erl Normal file
View file

@ -0,0 +1,111 @@
%% -------------------------------------------------------------------
%%
%% wterl: Erlang Wrapper for WiredTiger
%%
%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(wterl_event_handler).
-behaviour(gen_server).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
%% API
-export([start_link/0, stop/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(PREFIX, "wiredtiger").
%% ====================================================================
%% API
%% ====================================================================
-spec start_link() -> {ok, pid()} | {error, term()}.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec stop() -> ok.
stop() ->
gen_server:cast(?MODULE, stop).
%% ====================================================================
%% gen_server callbacks
%% ====================================================================
init([]) ->
wterl:set_event_handler_pid(self()),
{ok, []}.
handle_call(_Msg, _From, State) ->
{reply, ok, State}.
handle_cast(stop, State) ->
{stop, normal, State};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({error, {Errno, Message}}, State) ->
log(error, "~s: (~s) ~s", [?PREFIX, Errno, Message]),
{noreply, State};
handle_info({message, Info}, State) ->
log(info, "~s: ~s", [?PREFIX, Info]),
{noreply, State};
handle_info({progress, {Operation, Counter}}, State) ->
log(info, "~s: progress on ~s [~b]", [?PREFIX, Operation, Counter]),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% ====================================================================
%% Internal functions
%% ====================================================================
%% @private
-spec log(error | info, string(), [any()]) -> ok.
log(Urgency, Format, Args) ->
case proplists:is_defined(lager, application:which_applications()) of
true ->
log(lager, Urgency, Format, Args);
false ->
log(stdio, Urgency, Format, Args)
end.
-spec log(lager | stdio, error | info, string(), [any()]) -> ok.
log(lager, error, Format, Args) ->
lager:error(Format, Args);
log(lager, info, Format, Args) ->
lager:info(Format, Args);
log(stdio, _, Format, Args) ->
io:format(Format ++ "~n", Args).
%% ===================================================================
%% EUnit tests
%% ===================================================================
-ifdef(TEST).
-endif.

View file

@ -46,4 +46,5 @@ start_link() ->
init([]) ->
{ok, {{one_for_one, 5, 10}, [?CHILD(wterl_ets, worker),
?CHILD(wterl_conn, worker)]}}.
?CHILD(wterl_conn, worker),
?CHILD(wterl_event_handler, worker)]}}.

View file

@ -0,0 +1,91 @@
-module(basho_bench_driver_wterl).
-record(state, { connection, uri }).
-export([new/1,
run/4]).
-include_lib("basho_bench/include/basho_bench.hrl").
%% ====================================================================
%% API
%% ====================================================================
new(1) ->
%% Make sure wterl is available
case code:which(wterl) of
non_existing ->
?FAIL_MSG("~s requires wterl to be available on code path.\n",
[?MODULE]);
_ ->
ok
end,
{ok, _} = wterl_sup:start_link(),
setup(1);
new(Id) ->
setup(Id).
setup(Id) ->
%% Get the target directory
Dir = basho_bench_config:get(wterl_dir, "/tmp"),
Config = basho_bench_config:get(wterl, []),
Uri = config_value(table_uri, Config, "lsm:test"),
ConnectionOpts = config_value(connection, Config, [{create,true},{session_max, 8192}]),
SessionOpts = config_value(session, Config, []),
TableOpts = config_value(table, Config, []),
%% Start WiredTiger
Connection =
case wterl_conn:is_open() of
false ->
case wterl_conn:open(Dir, ConnectionOpts, SessionOpts) of
{ok, Conn} ->
Conn;
{error, Reason0} ->
?FAIL_MSG("Failed to establish a WiredTiger connection for ~p, wterl backend unable to start: ~p\n", [Id, Reason0])
end;
true ->
{ok, Conn} = wterl_conn:get(),
Conn
end,
case wterl:create(Connection, Uri, TableOpts) of
ok ->
{ok, #state{connection=Connection, uri=Uri}};
{error, Reason} ->
{error, Reason}
end.
run(get, KeyGen, _ValueGen, #state{connection=Connection, uri=Uri}=State) ->
case wterl:get(Connection, Uri, KeyGen()) of
{ok, _Value} ->
{ok, State};
not_found ->
{ok, State};
{error, Reason} ->
{error, Reason}
end;
run(put, KeyGen, ValueGen, #state{connection=Connection, uri=Uri}=State) ->
case wterl:put(Connection, Uri, KeyGen(), ValueGen()) of
ok ->
{ok, State};
{error, Reason} ->
{error, Reason}
end;
run(delete, KeyGen, _ValueGen, #state{connection=Connection, uri=Uri}=State) ->
case wterl:delete(Connection, Uri, KeyGen()) of
ok ->
{ok, State};
not_found ->
{ok, State};
{error, Reason} ->
{error, Reason}
end.
config_value(Key, Config, Default) ->
case proplists:get_value(Key, Config) of
undefined ->
Default;
Value ->
Value
end.

102
tools/wterl-b_b.config Normal file
View file

@ -0,0 +1,102 @@
%%-*- mode: erlang -*-
%% ex: ft=erlang ts=4 sw=4 et
%% How to:
%% * put the wterl-b_b.config file into basho_bench/examples
%% * put the basho_bench_driver_wterl.erl into basho_bench/src
%% * make clean in basho_bench, then make
%% * edit examples/wterl-b_b.config
%% - change {code_paths, ["../wterl"]}. to be a relative path to your
%% wterl directory
%% - change {wterl_dir, "/home/gburd/ws/basho_bench/data"}. to a fully
%% qualified location for your test data files (mkdir that directory
%% yourself, if it doesn't exist the test will fail 'enoent')
%% * to run, replace this path with the proper path on your system:
%% LD_LIBRARY_PATH=/home/you/wterl/priv ./basho_bench examples/wterl-b_b.config
%% * the test should run for 10 minutes (as it is configured right now)
%% with 4 concurrent workers accessing the same table
%%
%% Note:
%% There are two config sections in wt.config {wterl, [ ... ]}. and
%% {wterl_, [ ... ]}. The one being used is named "wterl" the other
%% config is ignored. I setup an LSM and BTREE config and to choose
%% which is run you just rename those two sections (turn one off by
%% adding a "_" to the name and take the "_" out of the other's name).
{mode, max}.
{duration, 10}.
{concurrent, 16}.
{report_interval, 1}.
{pb_timeout_general, 1000}. % ms
%{pb_timeout_read, ?}.
%{pb_timeout_write, ?}.
%{pb_timeout_listkeys, ?}.
%{pb_timeout_mapreduce, ?}.
{driver, basho_bench_driver_wterl}.
{key_generator, {int_to_bin_littleendian,{uniform_int, 5000000}}}.
{value_generator, {fixed_bin, 10000}}.
{operations, [{get, 4}, {put, 4}, {delete, 2}]}.
{code_paths, ["../wterl"]}.
{wterl_dir, "/home/gburd/ws/basho_bench/data"}.
%% lsm
{wterl, [
{connection, [
{create, true},
{session_sync, false},
{transaction_sync, "none"},
{log, [{enabled, false}]},
{session_max, 1024},
{cache_size, 4294967296},
{verbose, []},
% "salvage", "verify" are okay, however...
% for some unknown reason, if you add these additional
% verbose flags Erlang SEGV's "size_object: bad tag for 0x80"
% no idea why... yet... you've been warned.
%"block", "shared_cache", "reconcile", "evict", "lsm",
%"fileops", "read", "write", "readserver", "evictserver",
%"hazard", "mutex", "ckpt"
{statistics_log, [{wait, 30}]}
]},
{session, [ {isolation, "snapshot"} ]},
{table_uri, "lsm:test"},
{lsm_merge_threads, 2},
{table, [
{internal_page_max, "128K"},
{leaf_page_max, "128K"},
{lsm_chunk_size, "25MB"},
{lsm_bloom_newest, true},
{lsm_bloom_oldest, true} ,
{lsm_bloom_bit_count, 128},
{lsm_bloom_hash_count, 64},
{lsm_bloom_config, [{leaf_page_max, "8MB"}]},
{block_compressor, "snappy"} % bzip2
]}
]}.
%% btree
{wterl_, [
{connection, [
{create, true},
{session_sync, false},
{transaction_sync, "none"},
{log, [{enabled, false}]},
{session_max, 1024},
{cache_size, 4294967296},
{verbose, []},
% "salvage", "verify" are okay, however...
% for some unknown reason, if you add these additional
% verbose flags Erlang SEGV's "size_object: bad tag for 0x80"
% no idea why... yet... you've been warned.
%"block", "shared_cache", "reconcile", "evict", "lsm",
%"fileops", "read", "write", "readserver", "evictserver",
%"hazard", "mutex", "ckpt"
{statistics_log, [{wait, 30}]},
{checkpoint, [{await, 10}]}
]},
{session, [ {isolation, "snapshot"} ]},
{table_uri, "table:test"},
{table, [
{block_compressor, "snappy"} % bzip2
]}
]}.

10
update-version.sh Executable file
View file

@ -0,0 +1,10 @@
#!/bin/sh -
# Note: also, remember to update version numbers in rpath specs so that shared libs can be found at runtime!!!
wterl=`git describe --always --long --tags`
wiredtiger0=`(cd c_src/wiredtiger-[0-9.]* && git describe --always --long --tags)`
wiredtiger=`echo $wiredtiger0 | awk '{print $2}'`
echo $wterl
echo $wiredtiger