added packages and conf directories. added update and scan operations.

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@2546 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
michim 2011-05-12 20:41:18 +00:00
parent abda0606de
commit 5291a25dce
8 changed files with 241 additions and 55 deletions

10
conf/dht_lsm_server.ini Normal file
View file

@ -0,0 +1,10 @@
env_dir=/home/y/var/dht_bdb_server/data
port=9090
num_threads=$(num_threads)
num_partitions=$(num_partitions)
num_retries=100
key_buffer_size_bytes=2048
value_buffer_size_bytes=1150976
checkpoint_frequency_msec=1000
checkpoint_min_change_kb=10

12
packages/Makefile Normal file
View file

@ -0,0 +1,12 @@
ROOT=/home/y
include $(ROOT)/share/yahoo_cfg/Make.defs
# Just to pick up "clean" and "package" rules
all:: package-release
test:: package-test
clean:: package-clean
include $(ROOT)/share/yahoo_cfg/Make.rules

9
packages/dht_lsm_server.run Executable file
View file

@ -0,0 +1,9 @@
#!/bin/sh
#
# This is the 'run' script used by daemontools to start dht_lsm_server.
# See the dist page for more details about daemontools.
#
# http://dist.corp.yahoo.com/by-package/daemontools_y/
#
cd /home/y/var/dht_lsm_server/data
exec /home/y/bin/setuidgid nobody /home/y/bin64/lsm_server 2>&1

View file

@ -0,0 +1,42 @@
PRODUCT_NAME = dht_lsm_server
VERSION=1.0.0
SHORT_DESC = LSM-Tree
LONG_DESC = LSM-Tree persistent store for sherpa
CUSTODIAN = ydht-devel@yahoo-inc.com http://twiki.corp.yahoo.com/view/YDHT
OWNER = root
GROUP = wheel
PERM = 0444
YINST bug-product dht
YINST bug-component General
YINST requires pkg daemontools_y ROOT ROOT_MAX
YINST requires pkg dht_persistent_store64 ROOT ROOT_MAX
dir 0777 - - logs/dht_lsm_server
dir 0755 - - var/daemontools/dht_lsm_server
dir 0755 - - var/daemontools/dht_lsm_server/log
dir 0777 - - conf/dht_lsm_server
dir 0777 - - var/dht_lsm_server/data
file 0555 - - bin64/ ../sherpa/main/lsm_server
file 0444 - - lib64/liblogstore.so.1 ../build/liblogstore.so
symlink 0444 - - lib64/liblogstore.so liblogstore.so.1
file 0444 - - lib64/ ../sherpa/libLSMServer.so.1
symlink 0444 - - lib64/libLSMServer.so libLSMServer.so.1
file 0555 - - var/daemontools/dht_lsm_server/run dht_lsm_server.run
file 0555 - - var/daemontools/dht_lsm_server/log/run dht_lsm_server_log.run
configfile 0644 - - conf/dht_lsm_server/dht_lsm_server.ini ../conf/dht_lsm_server.ini expand overwrite
YINST set log_level INFO
YINST set cache_size_gb 4
YINST set num_partitions 32
YINST set num_threads 32
YINST start 30 $ROOT/bin/svcstart dht_lsm_server
YINST stop 30 $ROOT/bin/svcstop dht_lsm_server
YINST pre-deactivate $ROOT/bin/svcstop dht_lsm_server

View file

@ -0,0 +1,2 @@
#!/bin/sh
exec /home/y/bin/setuidgid nobody /usr/sbin/cronolog -l /home/y/logs/dht_lsm_server/stdout.log "/home/y/logs/dht_lsm_server/stdout.log.%Y%m%d"

View file

@ -49,6 +49,7 @@ LSMServerHandler(int argc, char **argv)
}
}
pthread_mutex_init(&mutex_, 0);
logtable<datatuple>::init_stasis();
int xid = Tbegin();
@ -85,7 +86,43 @@ LSMServerHandler(int argc, char **argv)
*/
}
//logtable<datatuple>::deinit_stasis();
initNextDatabaseId();
}
void LSMServerHandler::
initNextDatabaseId()
{
nextDatabaseId_ = 1;
uint32_t id = 0;
datatuple* start = buildTuple(id, "");
datatuple* end = buildTuple(id + 1, "");
logtable<datatuple>::iterator* itr = new logtable<datatuple>::iterator(ltable_, start);
datatuple* current;
while ((current = itr->getnext())) {
// are we at the end of range?
if (datatuple::compare_obj(current, end) >= 0) {
datatuple::freetuple(current);
break;
}
uint32_t currentId = *((uint32_t*)(current->data()));
if (currentId > nextDatabaseId_) {
nextDatabaseId_ = currentId;
}
datatuple::freetuple(current);
}
nextDatabaseId_++;
delete itr;
}
uint32_t LSMServerHandler::
nextDatabaseId()
{
uint32_t id;
pthread_mutex_lock(&mutex_);
nextDatabaseId_++;
id = nextDatabaseId_;
pthread_mutex_unlock(&mutex_);
return id;
}
ResponseCode::type LSMServerHandler::
@ -104,8 +141,13 @@ insert(datatuple* tuple)
ResponseCode::type LSMServerHandler::
addDatabase(const std::string& databaseName)
{
datatuple* tup = buildTuple(0, databaseName, (void*)&nextDatabaseId_, (uint32_t)(sizeof(nextDatabaseId_)));
uint32_t id = nextDatabaseId();
datatuple* tup = buildTuple(0, databaseName, (void*)&id, (uint32_t)(sizeof(id)));
datatuple* ret = get(tup);
if (ret) {
datatuple::freetuple(ret);
return sherpa::ResponseCode::DatabaseExists;
}
return insert(tup);
}
@ -141,44 +183,62 @@ scan(RecordListResponse& _return, const std::string& databaseName, const ScanOrd
const std::string& endKey, const bool endKeyIncluded,
const int32_t maxRecords, const int32_t maxBytes)
{
#if 0
BdbIterator itr;
uint32_t id;
_return.responseCode = getDatabaseId(databaseName, id);
if (_return.responseCode != sherpa::ResponseCode::Ok) {
uint32_t id = getDatabaseId(databaseName);
if (id == 0) {
// database not found
_return.responseCode = sherpa::ResponseCode::DatabaseNotFound;
return;
}
boost::thread_specific_ptr<RecordBuffer> buffer;
if (buffer.get() == NULL) {
buffer.reset(new RecordBuffer(keyBufferSizeBytes_, valueBufferSizeBytes_));
}
datatuple* start = buildTuple(id, startKey);
datatuple* end;
if (endKey.empty()) {
insertDatabaseId(const_cast<std::string&>(endKey), id + 1);
end = buildTuple(id + 1, endKey);
} else {
insertDatabaseId(const_cast<std::string&>(endKey), id);
end = buildTuple(id, endKey);
}
insertDatabaseId(const_cast<std::string&>(startKey), id);
itr.init(db_[id % numPartitions_], const_cast<std::string&>(startKey), startKeyIncluded, const_cast<std::string&>(endKey), endKeyIncluded, order, *buffer);
logtable<datatuple>::iterator* itr = new logtable<datatuple>::iterator(ltable_, start);
int32_t resultSize = 0;
_return.responseCode = sherpa::ResponseCode::Ok;
while ((maxRecords == 0 || (int32_t)(_return.records.size()) < maxRecords) &&
(maxBytes == 0 || resultSize < maxBytes)) {
BdbIterator::ResponseCode rc = itr.next(*buffer);
if (rc == BdbIterator::ScanEnded) {
datatuple* current = itr->getnext();
if (current == NULL) {
_return.responseCode = sherpa::ResponseCode::ScanEnded;
break;
} else if (rc != BdbIterator::Ok) {
_return.responseCode = sherpa::ResponseCode::Error;
}
int cmp = datatuple::compare_obj(current, start);
std::cout << "start key: (" << std::string((const char*)(start->strippedkey()), start->strippedkeylen()) << ")" << std::endl;
std::cout << "current key: (" << std::string((const char*)(current->strippedkey()), current->strippedkeylen()) << ")" << std::endl;
std::cout << "start key: " << startKey << " cmp: " << cmp << std::endl;
if ((!startKeyIncluded) && cmp == 0) {
datatuple::freetuple(current);
continue;
}
// are we at the end of range?
cmp = datatuple::compare_obj(current, end);
std::cout << "end key: " << endKey << " cmp: " << cmp << std::endl;
if ((!endKeyIncluded && cmp >= 0) ||
(endKeyIncluded && cmp > 0)) {
datatuple::freetuple(current);
_return.responseCode = sherpa::ResponseCode::ScanEnded;
break;
}
Record rec;
rec.key.assign(buffer->getKeyBuffer() + sizeof(id), buffer->getKeySize() - sizeof(id));
rec.value.assign(buffer->getValueBuffer(), buffer->getValueSize());
int32_t keySize = current->strippedkeylen() - sizeof(id);
int32_t dataSize = current->datalen();
rec.key.assign((char*)(current->strippedkey()) + sizeof(id), keySize);
rec.value.assign((char*)(current->data()), dataSize);
_return.records.push_back(rec);
resultSize += buffer->getKeySize() + buffer->getValueSize();
resultSize += keySize + dataSize;
datatuple::freetuple(current);
}
#endif
delete itr;
}
datatuple* LSMServerHandler::
@ -288,24 +348,19 @@ update(const std::string& databaseName,
const std::string& recordName,
const std::string& recordBody)
{
/*
uint32_t id;
ResponseCode::type rc = getDatabaseId(databaseName, id);
if (rc != sherpa::ResponseCode::Ok) {
return rc;
uint32_t id = getDatabaseId(databaseName);
std::cout << "michim: enter update" << std::endl;
if (id == 0) {
return sherpa::ResponseCode::DatabaseNotFound;
}
insertDatabaseId(const_cast<std::string&>(recordName), id);
Bdb::ResponseCode dbrc = db_[id % numPartitions_]->update(recordName, recordBody);
if (dbrc == Bdb::Ok) {
return sherpa::ResponseCode::Ok;
} else if (dbrc == Bdb::KeyNotFound) {
datatuple* oldRecordBody = get(id, recordName);
if (oldRecordBody == NULL) {
return sherpa::ResponseCode::RecordNotFound;
} else {
return sherpa::ResponseCode::Error;
}
*/
return sherpa::ResponseCode::Error;
std::cout << "michim: updating record" << std::endl;
datatuple::freetuple(oldRecordBody);
datatuple* tup = buildTuple(id, recordName, recordBody);
return insert(tup);
}
ResponseCode::type LSMServerHandler::
@ -365,6 +420,6 @@ buildTuple(uint32_t databaseId, const std::string& recordName, const void* body,
*(uint32_t*)key = databaseId;
memcpy(((uint32_t*)key) + 1, recordName.c_str(), recordName.size());
datatuple *tup = datatuple::create(key, keySize, body, bodySize);
std::cout << "built tuple: key: (" << std::string((const char*)(tup->strippedkey()), tup->strippedkeylen()) << ")" << std::endl;
return tup;
}

View file

@ -30,11 +30,14 @@ public:
private:
ResponseCode::type insert(datatuple* tuple);
uint32_t getDatabaseId(const std::string& databaseName);
uint32_t nextDatabaseId();
datatuple* get(uint32_t databaseId, const std::string& recordName);
datatuple* get(datatuple* tuple);
datatuple* buildTuple(uint32_t databaseId, const std::string& recordName);
datatuple* buildTuple(uint32_t databaseId, const std::string& recordName, const std::string& recordBody);
datatuple* buildTuple(uint32_t databaseId, const std::string& recordName, const void* body, uint32_t bodySize);
void initNextDatabaseId();
logtable<datatuple>* ltable_;
uint32_t nextDatabaseId_;
pthread_mutex_t mutex_;
};

View file

@ -19,44 +19,97 @@ int main(int argc, char **argv) {
socket->setNoDelay(true);
sherpa::BinaryResponse getResponse;
sherpa::RecordListResponse scanResponse;
cout << client.addDatabase("michi0") << endl;;
cout << client.insert("michi0", "k1", "v1") << endl;
cout << client.insert("michi0", "k2", "v1") << endl;
cout << client.insert("michi0", "k3", "v1") << endl;
client.get(getResponse, "michi0", "k1");
std::string db = "db";
std::string db1 = "db1";
cout << client.addDatabase(db) << endl;;
cout << client.insert(db, "kkkkkkkkkkkkk1", "v1") << endl;
cout << client.insert(db, "kkkkkkkkkkkkk2", "v2") << endl;
cout << client.insert(db, "kkkkkkkkkkkkk3", "v3") << endl;
client.get(getResponse, db, "kkkkkkkkkkkkk1");
cout << getResponse.responseCode << endl;
cout << getResponse.value << endl;
client.get(getResponse, "michi0", "k2");
client.get(getResponse, db, "kkkkkkkkkkkkk2");
cout << getResponse.responseCode << endl;
cout << getResponse.value << endl;
client.get(getResponse, "michi0", "k3");
client.get(getResponse, db, "kkkkkkkkkkkkk3");
cout << getResponse.responseCode << endl;
cout << getResponse.value << endl;
client.get(getResponse, "fdsafdasfdasfdsaf", "k3");
cout << client.update(db, "k0", "v11") << endl;
cout << client.update(db, "kkkkkkkkkkkkk1", "v11") << endl;
cout << client.update(db, "kkkkkkkkkkkkk2", "v12") << endl;
cout << client.update(db, "kkkkkkkkkkkkk3", "v13") << endl;
client.get(getResponse, db, "kkkkkkkkkkkkk1");
cout << getResponse.responseCode << endl;
cout << getResponse.value << endl;
client.get(getResponse, db, "kkkkkkkkkkkkk2");
cout << getResponse.responseCode << endl;
cout << getResponse.value << endl;
client.get(getResponse, db, "kkkkkkkkkkkkk3");
cout << getResponse.responseCode << endl;
cout << getResponse.value << endl;
/*
client.get(getResponse, "fdsafdasfdasfdsaf", "kkkkkkkkkkkkk3");
cout << getResponse.responseCode << endl;
client.get(getResponse, "michi0", "k4");
client.get(getResponse, db, "k4");
cout << getResponse.responseCode << endl;
return 0;
*/
client.scan(scanResponse, "michi", sherpa::ScanOrder::Ascending, "", true, "", true, 100, 100);
client.scan(scanResponse, db, sherpa::ScanOrder::Ascending, "", true, "", true, 100, 100);
std::vector<sherpa::Record>::iterator itr;
for (itr = scanResponse.records.begin(); itr != scanResponse.records.end(); itr++) {
cout << itr->key << " " << itr->value << endl;
}
std::cout << std::endl;
client.scan(scanResponse, "michi", sherpa::ScanOrder::Descending, "", true, "", true, 100, 100);
client.scan(scanResponse, db, sherpa::ScanOrder::Ascending, "kkkkkkkkkkkkk1", false, "kkkkkkkkkkkkk3", false, 100, 100);
for (itr = scanResponse.records.begin(); itr != scanResponse.records.end(); itr++) {
cout << itr->key << " " << itr->value << endl;
}
std::cout << std::endl;
client.scan(scanResponse, db, sherpa::ScanOrder::Ascending, "kkkkkkkkkkkkk1", true, "kkkkkkkkkkkkk3", true, 100, 100);
for (itr = scanResponse.records.begin(); itr != scanResponse.records.end(); itr++) {
cout << itr->key << " " << itr->value << endl;
}
std::cout << std::endl;
client.scan(scanResponse, db, sherpa::ScanOrder::Ascending, "kkkkkkkkkkkkk2", true, "kkkkkkkkkkkkk2", true, 100, 100);
for (itr = scanResponse.records.begin(); itr != scanResponse.records.end(); itr++) {
cout << itr->key << " " << itr->value << endl;
}
std::cout << std::endl;
client.scan(scanResponse, db, sherpa::ScanOrder::Ascending, "k", true, "kkkkkkkkkkkkk4", true, 100, 100);
for (itr = scanResponse.records.begin(); itr != scanResponse.records.end(); itr++) {
cout << itr->key << " " << itr->value << endl;
}
std::cout << std::endl;
cout << "adding db one more time" << endl;
cout << client.addDatabase(db) << endl;;
cout << client.addDatabase(db1) << endl;;
cout << client.insert(db1, "new key", "new value") << endl;
client.get(getResponse, db1, "new key");
cout << getResponse.responseCode << endl;
cout << getResponse.value << endl;
client.get(getResponse, db, "new key");
cout << getResponse.responseCode << endl;
cout << getResponse.value << endl;
return 0;
/*
cout << client.remove("michi", "k1") << endl;
client.get(getResponse, "michi", "k1");
cout << getResponse.responseCode << endl;
transport->close();
*/
return 0;
}