change cerr to log4cpp

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@1204 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-09-29 17:58:34 +00:00
parent e5518eac26
commit 4c0e8a7186
5 changed files with 96 additions and 101 deletions

View file

@ -85,10 +85,10 @@ template<class TUPLE>
void logtable<TUPLE>::init_stasis() { void logtable<TUPLE>::init_stasis() {
DataPage<datatuple>::register_stasis_page_impl(); DataPage<datatuple>::register_stasis_page_impl();
stasis_buffer_manager_size = 768 * 1024; // 4GB = 2^10 pages:
// XXX Workaround Stasis' (still broken) default concurrent buffer manager // XXX Workaround Stasis' (still broken) default concurrent buffer manager
stasis_buffer_manager_size = 1024 * 1024; // 4GB = 2^10 pages:
//stasis_buffer_manager_factory = stasis_buffer_manager_hash_factory; //stasis_buffer_manager_factory = stasis_buffer_manager_hash_factory;
stasis_buffer_manager_hint_writes_are_sequential = 1;
Tinit(); Tinit();
} }

View file

@ -3,58 +3,64 @@
#include "TabletMetadata.h" #include "TabletMetadata.h"
#include "tcpclient.h" #include "tcpclient.h"
#include <fstream> #include <dht/LogUtils.h>
#include <iostream>
// Initialize the logger
static log4cpp::Category &log =
log4cpp::Category::getInstance("dht.su."__FILE__);
class LSMPersistentParent : PersistentParent { class LSMPersistentParent : PersistentParent {
public: public:
LSMPersistentParent() : ordered(true), hashed(false) { LSMPersistentParent() : ordered(true), hashed(false) {
} // we ignore the ordered flag... } // we ignore the ordered flag...
~LSMPersistentParent() {
DHT_DEBUG_STREAM() << "~LSMPersistentParent called";
}
SuCode::ResponseCode install(const SectionConfig& config) { SuCode::ResponseCode install(const SectionConfig& config) {
ordered.filestr << "LSM install called" << std::endl; DHT_DEBUG_STREAM() << "LSM install called";
return SuCode::SuOk; return SuCode::SuOk;
} }
SuCode::ResponseCode init(const SectionConfig& config) { SuCode::ResponseCode init(const SectionConfig& config) {
ordered.filestr << "LSM init called" << std::endl; DHT_DEBUG_STREAM() << "LSM init called";
ordered.init(config); ordered.init(config);
hashed.init(config); hashed.init(config);
return SuCode::SuOk; return SuCode::SuOk;
} }
PersistentStore *getHashedStore() { PersistentStore *getHashedStore() {
ordered.filestr << "LSM getHashedStore called" << std::endl; DHT_DEBUG_STREAM() << "LSM getHashedStore called";
return &hashed; return &hashed;
} }
PersistentStore *getOrderedStore() { PersistentStore *getOrderedStore() {
ordered.filestr << "LSM getOrderedStore called" << std::endl; DHT_DEBUG_STREAM() << "LSM getOrderedStore called";
return &ordered; return &ordered;
} }
bool ping() { bool ping() {
ordered.filestr << "LSM ping called" << std::endl; DHT_DEBUG_STREAM() << "LSM ping called";
return true; return ordered.ping();
} // XXX call OP_DBG_NOOP }
std::string getName() { std::string getName() {
ordered.filestr << "LSM getName called" << std::endl; DHT_DEBUG_STREAM() << "LSM getName called";
return "logstore"; return "logstore";
} }
SuCode::ResponseCode getFreeSpaceBytes(double & freeSpaceBytes) { SuCode::ResponseCode getFreeSpaceBytes(double & freeSpaceBytes) {
ordered.filestr << "LSM getFreeSpaceBytes called" << std::endl; DHT_DEBUG_STREAM() << "LSM getFreeSpaceBytes called";
freeSpaceBytes = 1024.0 *1024.0 * 1024.0; // XXX stub freeSpaceBytes = 1024.0 *1024.0 * 1024.0; // XXX stub
return SuCode::SuOk; return SuCode::SuOk;
} }
SuCode::ResponseCode getDiskMaxBytes(double & diskMaxBytes) { SuCode::ResponseCode getDiskMaxBytes(double & diskMaxBytes) {
ordered.filestr << "LSM getDiskMaxBytes called" << std::endl; DHT_DEBUG_STREAM() << "LSM getDiskMaxBytes called";
diskMaxBytes = 10.0 * 1024.0 *1024.0 * 1024.0; // XXX stub diskMaxBytes = 10.0 * 1024.0 *1024.0 * 1024.0; // XXX stub
return SuCode::SuOk; return SuCode::SuOk;
} }
SuCode::ResponseCode cleanupTablet(uint64_t uniqId, SuCode::ResponseCode cleanupTablet(uint64_t uniqId,
const std::string & tableName, const std::string & tableName,
const std::string & tabletName) { const std::string & tabletName) {
ordered.filestr << "LSM cleanupTablet called" << std::endl; DHT_DEBUG_STREAM() << "LSM cleanupTablet called";
return SuCode::SuOk; // XXX stub return SuCode::SuOk; // XXX stub
} }
SuCode::ResponseCode getTabletMappingList(TabletList & tabletList) { SuCode::ResponseCode getTabletMappingList(TabletList & tabletList) {
ordered.filestr << "LSM getTabletMappingList called" << std::endl; DHT_DEBUG_STREAM() << "LSM getTabletMappingList called";
std::string metadata_table = std::string("ydht_metadata_table"); std::string metadata_table = std::string("ydht_metadata_table");
std::string metadata_tablet = std::string("0"); std::string metadata_tablet = std::string("0");
@ -63,30 +69,24 @@ public:
size_t startlen; size_t startlen;
size_t endlen; size_t endlen;
ordered.filestr << "getTabletMappingList C" << std::endl;
unsigned char * start_tup = ordered.my_strcat(metadata_table, metadata_tablet, "", &startlen); unsigned char * start_tup = ordered.my_strcat(metadata_table, metadata_tablet, "", &startlen);
unsigned char * end_tup = ordered.my_strcat(metadata_table, metadata_tabletEnd, "", &endlen); unsigned char * end_tup = ordered.my_strcat(metadata_table, metadata_tabletEnd, "", &endlen);
ordered.filestr << "start tup = " << start_tup << std::endl; DHT_DEBUG_STREAM() << "start tup = " << start_tup;
ordered.filestr << "end tup = " << end_tup << std::endl; DHT_DEBUG_STREAM() << "end tup = " << end_tup;
datatuple * starttup = datatuple::create(start_tup, startlen); datatuple * starttup = datatuple::create(start_tup, startlen);
datatuple * endtup = datatuple::create(end_tup, endlen); datatuple * endtup = datatuple::create(end_tup, endlen);
free(start_tup); free(start_tup);
free(end_tup); free(end_tup);
ordered.filestr << "getTabletMappingList B conn = " << ordered.l_ << std::endl;
uint8_t rcode = logstore_client_op_returns_many(ordered.l_, OP_SCAN, starttup, endtup, 0); // 0 = no limit. uint8_t rcode = logstore_client_op_returns_many(ordered.l_, OP_SCAN, starttup, endtup, 0); // 0 = no limit.
ordered.filestr << "getTabletMappingList A'" << std::endl;
datatuple::freetuple(starttup); datatuple::freetuple(starttup);
datatuple::freetuple(endtup); datatuple::freetuple(endtup);
datatuple * next; datatuple * next;
ordered.filestr << "getTabletMappingList A" << std::endl;
ordered.filestr.flush();
TabletMetadata m; TabletMetadata m;
if(rcode == LOGSTORE_RESPONSE_SENDING_TUPLES) { if(rcode == LOGSTORE_RESPONSE_SENDING_TUPLES) {
@ -94,16 +94,20 @@ public:
ordered.metadata_buf(m, next->key(), next->keylen()); ordered.metadata_buf(m, next->key(), next->keylen());
struct ydht_maptable_schema md; struct ydht_maptable_schema md;
std::string cat = m.table() + m.tablet();
md.uniq_id = 0; md.uniq_id = 0;
for(int i = 0; i < cat.length(); i++) {
md.uniq_id += ((unsigned char)cat[i]); // XXX obviously, this is a terrible hack (and a poor hash function)
}
md.tableName = m.table(); md.tableName = m.table();
md.tabletName = m.tablet(); md.tabletName = m.tablet();
ordered.filestr << md.tableName << " : " << md.tabletName << std::endl; DHT_DEBUG_STREAM() << md.uniq_id << " : " << md.tableName << " : " << md.tabletName;
tabletList.push_back(md); tabletList.push_back(md);
datatuple::freetuple(next); datatuple::freetuple(next);
} }
ordered.filestr << "getTabletMappingListreturns" << std::endl; DHT_DEBUG_STREAM() << "getTabletMappingListreturns";
} else { } else {
ordered.filestr << "error " << (int)rcode << " in getTabletMappingList." << std::endl; DHT_ERROR_STREAM() << "error " << (int)rcode << " in getTabletMappingList.";
return SuCode::PStoreUnexpectedError; // XXX should be "connection closed error" or something... return SuCode::PStoreUnexpectedError; // XXX should be "connection closed error" or something...
} }
@ -112,7 +116,7 @@ public:
SuCode::ResponseCode getApproximateTableSize SuCode::ResponseCode getApproximateTableSize
(const std::string& tableId, (const std::string& tableId,
int64_t& tableSize, int64_t & rowCount) { int64_t& tableSize, int64_t & rowCount) {
ordered.filestr << "LSM getApproximateTableSize called" << std::endl; DHT_DEBUG_STREAM() << "LSM getApproximateTableSize called";
return ordered.getApproximateTableSize(tableId, tableSize, rowCount); return ordered.getApproximateTableSize(tableId, tableSize, rowCount);
} }
private: private:

View file

@ -12,15 +12,20 @@
#include "SuLimits.h" #include "SuLimits.h"
#include "dht/UtilityBuffer.h" #include "dht/UtilityBuffer.h"
//#include <dht/LogUtils.h> #include <dht/LogUtils.h>
#define DHT_DEBUG_STREAM() std::cerr
#define RESPONSE_ERROR_STREAM(x) std::cerr
#include "LSMPersistentStoreImpl.h" #include "LSMPersistentStoreImpl.h"
#include <tcpclient.h> #include <tcpclient.h>
// Initialize the logger
static log4cpp::Category &log =
log4cpp::Category::getInstance("dht.su."__FILE__);
class LSMIterator : public TabletIterator<StorageRecord> { class LSMIterator : public TabletIterator<StorageRecord> {
friend class LSMPersistentStoreImpl; friend class LSMPersistentStoreImpl;
public: public:
// StorageRecord * data; // <- defined in parent class. next() updates it. // StorageRecord * data; // <- defined in parent class. next() updates it.
@ -52,17 +57,17 @@ public:
} else { } else {
end_key = lsmImpl->buf_key(tabletMeta, "", &end_key_len); end_key = lsmImpl->buf_key(tabletMeta, "", &end_key_len);
if(end_key[end_key_len-2] != low_eos) { if(end_key[end_key_len-2] != low_eos) {
lsmImpl->filestr << "ERROR CORRUPT lsm tablet key = " << (char*)end_key << std::endl; DHT_ERROR_STREAM() << "CORRUPT lsm tablet key = " << (char*)end_key;
} else { } else {
end_key[end_key_len-2] = high_eos; end_key[end_key_len-2] = high_eos;
} }
} }
} else { } else {
lsmImpl->filestr << "WARNING: Scanning hash table, but ignoring contiunation range!" << std::endl; DHT_WARN_STREAM() << "Scanning hash table, but ignoring contiunation range!";
start_key = lsmImpl->buf_key(tabletMeta, "", &start_key_len); start_key = lsmImpl->buf_key(tabletMeta, "", &start_key_len);
end_key = lsmImpl->buf_key(tabletMeta, "", &end_key_len); end_key = lsmImpl->buf_key(tabletMeta, "", &end_key_len);
if(end_key[end_key_len-2] != low_eos) { if(end_key[end_key_len-2] != low_eos) {
lsmImpl->filestr << "ERROR CORRUPT lsm tablet key = " << (char*)end_key << std::endl; DHT_ERROR_STREAM() << "CORRUPT lsm tablet key = " << (char*)end_key;
} else { } else {
end_key[end_key_len-2] = high_eos; end_key[end_key_len-2] = high_eos;
} }
@ -70,11 +75,11 @@ public:
starttup = datatuple::create(start_key, start_key_len); starttup = datatuple::create(start_key, start_key_len);
std::string dbg((char*)start_key, start_key_len - 1); std::string dbg((char*)start_key, start_key_len - 1);
lsmImpl->filestr << "start lsm key = " << dbg << std::endl; DHT_DEBUG_STREAM() << "start lsm key = " << dbg;
endtup = datatuple::create(end_key, end_key_len); endtup = datatuple::create(end_key, end_key_len);
std::string dbg2((char*)end_key, end_key_len - 1); std::string dbg2((char*)end_key, end_key_len - 1);
lsmImpl->filestr << "end lsm key = " << dbg2 << std::endl; DHT_DEBUG_STREAM() << "end lsm key = " << dbg2;
uint8_t rc = logstore_client_op_returns_many(lsmImpl->scan_l_, OP_SCAN, starttup, endtup, scanLimit); uint8_t rc = logstore_client_op_returns_many(lsmImpl->scan_l_, OP_SCAN, starttup, endtup, scanLimit);
@ -90,25 +95,25 @@ public:
this->data = new StorageRecord(); this->data = new StorageRecord();
} }
~LSMIterator() { ~LSMIterator() {
lsmImpl->filestr << "close iterator called" << std::endl; DHT_DEBUG_STREAM() << "close iterator called";
// close iterator by running to the end of it... TODO devise a better way to close iterators early? // close iterator by running to the end of it... TODO devise a better way to close iterators early?
while(this->data) { while(this->data) {
next(); next();
} }
lsmImpl->filestr << "close iterator done" << std::endl; DHT_DEBUG_STREAM() << "close iterator done";
} }
SuCode::ResponseCode next() { SuCode::ResponseCode next() {
datatuple * tup; datatuple * tup;
lsmImpl->filestr << "next called" << std::endl; DHT_DEBUG_STREAM() << "next called";
if(error) { // only catches errors during scan setup. if(error) { // only catches errors during scan setup.
return SuCode::PStoreUnexpectedError; return SuCode::PStoreUnexpectedError;
} else if((tup = logstore_client_next_tuple(lsmImpl->scan_l_))) { } else if((tup = logstore_client_next_tuple(lsmImpl->scan_l_))) {
lsmImpl->filestr << "found tuple, key = " << tup->key() << " datalen = " << tup->datalen() << std::endl; DHT_DEBUG_STREAM() << "found tuple, key = " << tup->key() << " datalen = " << tup->datalen();
SuCode::ResponseCode rc = lsmImpl->tup_buf(*(this->data), tup); SuCode::ResponseCode rc = lsmImpl->tup_buf(*(this->data), tup);
datatuple::freetuple(tup); datatuple::freetuple(tup);
return rc; return rc;
} else { } else {
lsmImpl->filestr << "no tuple" << std::endl; DHT_DEBUG_STREAM() << "no tuple";
delete this->data; delete this->data;
this->data = NULL; this->data = NULL;
return SuCode::PStoreScanEnd; // XXX need to differentiate between end of scan and failure return SuCode::PStoreScanEnd; // XXX need to differentiate between end of scan and failure
@ -119,10 +124,6 @@ private:
uint8_t error; uint8_t error;
}; };
// Initialize the logger
//static log4cpp::Category &log =
// log4cpp::Category::getInstance("dht.su."__FILE__);
void LSMPersistentStoreImpl::buf_metadata(unsigned char ** buf, size_t *len, const TabletMetadata &m) { void LSMPersistentStoreImpl::buf_metadata(unsigned char ** buf, size_t *len, const TabletMetadata &m) {
std::string ydht_metadata_table = std::string("ydht_metadata_table"); std::string ydht_metadata_table = std::string("ydht_metadata_table");
std::string zero = std::string("0"); std::string zero = std::string("0");
@ -137,11 +138,13 @@ void LSMPersistentStoreImpl::metadata_buf(TabletMetadata &m, const unsigned char
// Metadata table key format: // Metadata table key format:
// ydht_metadata_table[low_eos]0[low_eos]table[low_eos]tablet[low_eos][low_eos] // ydht_metadata_table[low_eos]0[low_eos]table[low_eos]tablet[low_eos][low_eos]
assert(buf);
std::string ydht_metadata_table, zero, tmp, table, tablet, empty; std::string ydht_metadata_table, zero, tmp, table, tablet, empty;
my_strtok(buf, len, ydht_metadata_table, zero, tmp); my_strtok(buf, len, ydht_metadata_table, zero, tmp);
assert(tmp.c_str());
my_strtok((const unsigned char*)tmp.c_str(), tmp.length(), table, tablet, empty); my_strtok((const unsigned char*)tmp.c_str(), tmp.length(), table, tablet, empty);
filestr << "Parsed metadata: [" << table << "] [" << tablet << "] [" << empty << "](empty)" << std::endl; DHT_DEBUG_STREAM() << "Parsed metadata: [" << table << "] [" << tablet << "] [" << empty << "](empty)";
m.setTable(table); m.setTable(table);
m.setTablet(tablet); m.setTablet(tablet);
m.setTabletId(tmp.substr(0, tmp.length() - 1)); m.setTabletId(tmp.substr(0, tmp.length() - 1));
@ -258,7 +261,7 @@ LSMPersistentStoreImpl::key_buf(StorageRecord& ret,
const unsigned char * buf, size_t buf_len) { const unsigned char * buf, size_t buf_len) {
std::string table, tablet, key; std::string table, tablet, key;
my_strtok(buf, buf_len, table, tablet, key); my_strtok(buf, buf_len, table, tablet, key);
filestr << "key_buf parsed datatuple key: table = [" << table << "] tablet = [" << tablet << "] key = [" << key << "]" << std::endl; DHT_DEBUG_STREAM() << "key_buf parsed datatuple key: table = [" << table << "] tablet = [" << tablet << "] key = [" << key << "]";
ret.recordKey().setName(key); ret.recordKey().setName(key);
return SuCode::SuOk; return SuCode::SuOk;
} }
@ -298,22 +301,24 @@ LSMPersistentStoreImpl::val_buf(StorageRecord& ret,
ret.dataBlob().setDataSize(dataBlob_len); ret.dataBlob().setDataSize(dataBlob_len);
return SuCode::SuOk; return SuCode::SuOk;
} }
LSMPersistentStoreImpl:: LSMPersistentStoreImpl::
LSMPersistentStoreImpl(bool isOrdered) : isOrdered_(isOrdered), l_(NULL), scan_l_(NULL) { LSMPersistentStoreImpl(bool isOrdered) : isOrdered_(isOrdered), l_(NULL), scan_l_(NULL) {
filestr.open(isOrdered? "/tmp/lsm-log" : "/tmp/lsm-log-hashed", std::fstream::out | std::fstream::app); // filestr.open(isOrdered? "/tmp/lsm-log" : "/tmp/lsm-log-hashed", std::fstream::out | std::fstream::app);
filestr << "LSMP constructor called" << std::endl; // It would be unsafe to call the following, since we're statically initialized: DHT_DEBUG_STREAM() << "LSMP constructor called";
} }
LSMPersistentStoreImpl:: LSMPersistentStoreImpl::
~LSMPersistentStoreImpl() ~LSMPersistentStoreImpl()
{ {
filestr << "LSMP destructor called" << std::endl; DHT_DEBUG_STREAM() << "LSMP destructor called";
if(l_) logstore_client_close(l_); if(l_) logstore_client_close(l_);
if(scan_l_) logstore_client_close(scan_l_); if(scan_l_) logstore_client_close(scan_l_);
DHT_DEBUG_STREAM() << "LSMP destructor cleanly closed connections";
} }
SuCode::ResponseCode LSMPersistentStoreImpl::initMetadataMetadata(TabletMetadata& m) { SuCode::ResponseCode LSMPersistentStoreImpl::initMetadataMetadata(TabletMetadata& m) {
filestr << "LSMP initMetadataMetadata called [" << m << "] " << std::endl; DHT_DEBUG_STREAM() << "LSMP initMetadataMetadata called";
std::string metadata_table = std::string("ydht_metadata_table"); std::string metadata_table = std::string("ydht_metadata_table");
std::string metadata_tablet= std::string("0"); std::string metadata_tablet= std::string("0");
@ -328,7 +333,7 @@ SuCode::ResponseCode LSMPersistentStoreImpl::initMetadataMetadata(TabletMetadata
SuCode::ResponseCode LSMPersistentStoreImpl:: SuCode::ResponseCode LSMPersistentStoreImpl::
init(const SectionConfig &config) init(const SectionConfig &config)
{ {
filestr << "LSMP init called" << std::endl; DHT_DEBUG_STREAM() << "LSMP init called";
if(!l_) { // workaround bug 2870547 if(!l_) { // workaround bug 2870547
l_ = logstore_client_open("localhost", 32432, 60); // XXX hardcode none of these values l_ = logstore_client_open("localhost", 32432, 60); // XXX hardcode none of these values
scan_l_ = logstore_client_open("localhost", 32432, 60); // XXX hardcode none of these values scan_l_ = logstore_client_open("localhost", 32432, 60); // XXX hardcode none of these values
@ -338,14 +343,14 @@ init(const SectionConfig &config)
bool LSMPersistentStoreImpl:: bool LSMPersistentStoreImpl::
isOrdered(){ isOrdered(){
filestr << "LSMP isOrdered called" << std::endl; DHT_DEBUG_STREAM() << "LSMP isOrdered called";
return isOrdered_; return isOrdered_;
} }
SuCode::ResponseCode LSMPersistentStoreImpl:: SuCode::ResponseCode LSMPersistentStoreImpl::
addEmptyTablet(TabletMetadata& tabletMeta) addEmptyTablet(TabletMetadata& tabletMeta)
{ {
filestr << "LSMP addEmptyTablet called" << std::endl; DHT_DEBUG_STREAM() << "LSMP addEmptyTablet called";
// This is a no-op; we'll simply prepend the tablet string to each record. // This is a no-op; we'll simply prepend the tablet string to each record.
{ {
// Is table name too long? // Is table name too long?
@ -357,7 +362,7 @@ addEmptyTablet(TabletMetadata& tabletMeta)
if (mySQLTableName!=""){ if (mySQLTableName!=""){
filestr << "Tablet " << mySQLTableName << " already exists!" << std::endl; DHT_INFO_STREAM() << "Tablet " << mySQLTableName << " already exists!";
return SuCode::PStoreTabletAlreadyExists; return SuCode::PStoreTabletAlreadyExists;
} else { } else {
size_t keylen; unsigned char * key; size_t keylen; unsigned char * key;
@ -372,8 +377,7 @@ addEmptyTablet(TabletMetadata& tabletMeta)
SuCode::ResponseCode LSMPersistentStoreImpl:: SuCode::ResponseCode LSMPersistentStoreImpl::
dropTablet(TabletMetadata& tabletMeta) dropTablet(TabletMetadata& tabletMeta)
{ {
filestr << "LSMP dropTablet called" << std::endl; DHT_INFO_STREAM() << "dropTablet called. Falling back on clearTabletRange()";
DHT_DEBUG_STREAM() << "dropTablet called. Falling back on clearTabletRange()";
SuCode::ResponseCode ret = clearTabletRange(tabletMeta, 0); SuCode::ResponseCode ret = clearTabletRange(tabletMeta, 0);
size_t keylen; size_t keylen;
@ -388,10 +392,10 @@ dropTablet(TabletMetadata& tabletMeta)
tabletMeta.setTabletId(""); tabletMeta.setTabletId("");
if(!result) { if(!result) {
filestr << "LSMP dropTablet fails" << std::endl; DHT_WARN_STREAM() << "LSMP dropTablet fails";
ret = SuCode::PStoreTabletCleanupFailed; ret = SuCode::PStoreTabletCleanupFailed;
} else { } else {
filestr << "LSMP dropTablet succeeds" << std::endl; DHT_INFO_STREAM() << "LSMP dropTablet succeeds";
ret = SuCode::SuOk; ret = SuCode::SuOk;
} }
return ret; return ret;
@ -403,8 +407,7 @@ dropTablet(TabletMetadata& tabletMeta)
SuCode::ResponseCode LSMPersistentStoreImpl:: SuCode::ResponseCode LSMPersistentStoreImpl::
clearTabletRange(TabletMetadata& tabletMeta, uint32_t removalLimit) clearTabletRange(TabletMetadata& tabletMeta, uint32_t removalLimit)
{ {
filestr << "LSMP clearTabletRange called" << std::endl; DHT_WARN_STREAM() << "clear tablet range is unimplemented. ignoring request";
DHT_DEBUG_STREAM() << "clear tablet range is unimplemented. ignoring request";
return SuCode::SuOk; return SuCode::SuOk;
} }
@ -412,8 +415,7 @@ SuCode::ResponseCode LSMPersistentStoreImpl::
getApproximateTableSize(std::string tabletMeta, getApproximateTableSize(std::string tabletMeta,
int64_t& tableSize, int64_t& tableSize,
int64_t & rowCount) { int64_t & rowCount) {
filestr << "LSMP getApproximateTableSize called" << std::endl; DHT_WARN_STREAM() << "get approximate table size is unimplemented. returning dummy values";
DHT_DEBUG_STREAM() << "get approximate table size is unimplemented. returning dummy values";
tableSize = 1024 * 1024 * 1024; tableSize = 1024 * 1024 * 1024;
rowCount = 1024 * 1024; rowCount = 1024 * 1024;
return SuCode::SuOk; return SuCode::SuOk;
@ -423,7 +425,7 @@ getApproximateTableSize(TabletMetadata& tabletMeta,
int64_t& tableSize, int64_t& tableSize,
int64_t & rowCount) int64_t & rowCount)
{ {
filestr << "LSMP getApproximateTableSize (2) called" << std::endl; DHT_DEBUG_STREAM() << "LSMP getApproximateTableSize (2) called";
return getApproximateTableSize(tabletMeta.getTabletId(), tableSize, rowCount); return getApproximateTableSize(tabletMeta.getTabletId(), tableSize, rowCount);
} }
@ -431,7 +433,7 @@ getApproximateTableSize(TabletMetadata& tabletMeta,
SuCode::ResponseCode LSMPersistentStoreImpl:: SuCode::ResponseCode LSMPersistentStoreImpl::
get(const TabletMetadata& tabletMeta, StorageRecord& recordData) get(const TabletMetadata& tabletMeta, StorageRecord& recordData)
{ {
filestr << "LSMP get called" << tabletMeta.getTabletId() << ":" << recordData.recordKey()<< std::endl; DHT_DEBUG_STREAM() << "LSMP get called" << tabletMeta.getTabletId() << ":" << recordData.recordKey();
if(recordData.recordKey().name().length() > (isOrdered_ ? SuLimits::MAX_ORDERED_RECORD_NAME_LENGTH : SuLimits::MAX_RECORD_NAME_LENGTH)) { if(recordData.recordKey().name().length() > (isOrdered_ ? SuLimits::MAX_ORDERED_RECORD_NAME_LENGTH : SuLimits::MAX_RECORD_NAME_LENGTH)) {
return SuCode::PStoreIOFailed; return SuCode::PStoreIOFailed;
} }
@ -445,20 +447,20 @@ get(const TabletMetadata& tabletMeta, StorageRecord& recordData)
if((!result) || result->isDelete()) { if((!result) || result->isDelete()) {
ret = SuCode::PStoreRecordNotFound; ret = SuCode::PStoreRecordNotFound;
} else { } else {
DHT_DEBUG_STREAM() << "call val buf from get, data len = " << result->datalen() << std::endl; //DHT_DEBUG_STREAM() << "call val buf from get, data len = " << result->datalen() << std::endl;
ret = val_buf(recordData, result->data(), result->datalen()); ret = val_buf(recordData, result->data(), result->datalen());
} }
if(result) { if(result) {
datatuple::freetuple(result); datatuple::freetuple(result);
} }
filestr << "LSMP get returns succ = " << (ret == SuCode::SuOk) << std::endl; DHT_DEBUG_STREAM() << "LSMP get returns succ = " << (ret == SuCode::SuOk);
return ret; return ret;
} }
SuCode::ResponseCode LSMPersistentStoreImpl:: SuCode::ResponseCode LSMPersistentStoreImpl::
update(const TabletMetadata& tabletMeta, const StorageRecord& updateData) update(const TabletMetadata& tabletMeta, const StorageRecord& updateData)
{ {
filestr << "LSMP update called" << std::endl; DHT_DEBUG_STREAM() << "LSMP update called";
if(updateData.recordKey().name().length() > (isOrdered_ ? SuLimits::MAX_ORDERED_RECORD_NAME_LENGTH : SuLimits::MAX_RECORD_NAME_LENGTH)) { if(updateData.recordKey().name().length() > (isOrdered_ ? SuLimits::MAX_ORDERED_RECORD_NAME_LENGTH : SuLimits::MAX_RECORD_NAME_LENGTH)) {
return SuCode::PStoreIOFailed; return SuCode::PStoreIOFailed;
} }
@ -489,38 +491,35 @@ update(const TabletMetadata& tabletMeta, const StorageRecord& updateData)
SuCode::ResponseCode LSMPersistentStoreImpl:: // XXX what to do about update? SuCode::ResponseCode LSMPersistentStoreImpl:: // XXX what to do about update?
insert(const TabletMetadata& tabletMeta, insert(const TabletMetadata& tabletMeta,
const StorageRecord& insertData) { const StorageRecord& insertData) {
filestr << "LSMP insert called" << tabletMeta.getTabletId() << ":" << insertData.recordKey()<< std::endl; DHT_DEBUG_STREAM()<< "LSMP insert called" << tabletMeta.getTabletId() << ":" << insertData.recordKey();
size_t keybuflen, valbuflen; size_t keybuflen, valbuflen;
if(insertData.recordKey().name().length() > (isOrdered_ ? SuLimits::MAX_ORDERED_RECORD_NAME_LENGTH : SuLimits::MAX_RECORD_NAME_LENGTH)) { if(insertData.recordKey().name().length() > (isOrdered_ ? SuLimits::MAX_ORDERED_RECORD_NAME_LENGTH : SuLimits::MAX_RECORD_NAME_LENGTH)) {
return SuCode::PStoreIOFailed; return SuCode::PStoreIOFailed;
} }
unsigned char * keybuf = buf_key(tabletMeta, insertData.recordKey(), &keybuflen); unsigned char * keybuf = buf_key(tabletMeta, insertData.recordKey(), &keybuflen);
filestr << "keybuf = " << keybuf << " (and perhaps a null)" << std::endl; DHT_DEBUG_STREAM() << "keybuf = " << keybuf << " (and perhaps a null)";
unsigned char * valbuf = buf_val(insertData, &valbuflen); unsigned char * valbuf = buf_val(insertData, &valbuflen);
filestr << "valbuf = " << valbuf << " (and perhaps a null)" << std::endl; DHT_DEBUG_STREAM() << "valbuf = " << valbuf << " (and perhaps a null)";
datatuple * key_ins = datatuple::create(keybuf, keybuflen, valbuf, valbuflen); datatuple * key_ins = datatuple::create(keybuf, keybuflen, valbuf, valbuflen);
filestr << "insert create()" << std::endl; DHT_DEBUG_STREAM() << "insert create()";
void * result = (void*)logstore_client_op(l_, OP_INSERT, key_ins); void * result = (void*)logstore_client_op(l_, OP_INSERT, key_ins);
filestr << "insert insert()" << std::endl; DHT_DEBUG_STREAM() << "insert insert()";
if(result) { if(result) {
filestr << "LSMP insert will return result = " << result << std::endl; DHT_DEBUG_STREAM() << "LSMP insert will return result = " << result;
} else { } else {
filestr << "LSMP insert will return null "<< std::endl; DHT_DEBUG_STREAM() << "LSMP insert will return null ";
} }
datatuple::freetuple(key_ins); datatuple::freetuple(key_ins);
filestr << "insert free(key_ins)" << std::endl;
free(keybuf); free(keybuf);
filestr << "insert free(keybuf)" << std::endl;
free(valbuf); free(valbuf);
filestr << "insert free(valbuf)" << std::endl; DHT_DEBUG_STREAM() << "LSMP insert returns ";
filestr << "LSMP insert returns "<< std::endl;
return result ? SuCode::SuOk : SuCode::PStoreUnexpectedError; return result ? SuCode::SuOk : SuCode::PStoreUnexpectedError;
} }
SuCode::ResponseCode LSMPersistentStoreImpl:: SuCode::ResponseCode LSMPersistentStoreImpl::
remove(const TabletMetadata& tabletMeta, const RecordKey& recordName) remove(const TabletMetadata& tabletMeta, const RecordKey& recordName)
{ {
filestr << "LSMP remove called" << std::endl; DHT_DEBUG_STREAM() << "LSMP remove called";
if(recordName.name().length() > (isOrdered_ ? SuLimits::MAX_ORDERED_RECORD_NAME_LENGTH : SuLimits::MAX_RECORD_NAME_LENGTH)) { if(recordName.name().length() > (isOrdered_ ? SuLimits::MAX_ORDERED_RECORD_NAME_LENGTH : SuLimits::MAX_RECORD_NAME_LENGTH)) {
return SuCode::PStoreIOFailed; return SuCode::PStoreIOFailed;
} }
@ -536,15 +535,16 @@ remove(const TabletMetadata& tabletMeta, const RecordKey& recordName)
free(buf); free(buf);
return result ? SuCode::SuOk : SuCode::PStoreUnexpectedError; return result ? SuCode::SuOk : SuCode::PStoreUnexpectedError;
} else { } else {
filestr << "LSMP remove: record not found, or error" << std::endl; DHT_DEBUG_STREAM() << "LSMP remove: record not found, or error";
return rc; return rc;
} }
} }
bool LSMPersistentStoreImpl::ping() { bool LSMPersistentStoreImpl::ping() {
filestr << "LSMP ping called" << std::endl; DHT_DEBUG_STREAM() << "LSMP ping called";
datatuple * ret = logstore_client_op(l_, OP_DBG_NOOP); datatuple * ret = logstore_client_op(l_, OP_DBG_NOOP);
if(ret == NULL) { if(ret == NULL) {
DHT_WARN_STREAM() << "LSMP ping failed";
return false; return false;
} else { } else {
datatuple::freetuple(ret); datatuple::freetuple(ret);
@ -556,16 +556,12 @@ StorageRecordIterator LSMPersistentStoreImpl::
scan(const TabletMetadata& tabletMeta, const ScanContinuation& continuation, scan(const TabletMetadata& tabletMeta, const ScanContinuation& continuation,
ScanSelect::Selector selector, const uint64_t expiryTime, unsigned int scanLimit, size_t byteLimit) ScanSelect::Selector selector, const uint64_t expiryTime, unsigned int scanLimit, size_t byteLimit)
{ {
filestr << "LSMP scan called. " << std::endl; DHT_DEBUG_STREAM() << "LSMP scan called. Tablet: " << tabletMeta.getTabletId();
filestr << "LSMP scan called. Tablet: " << tabletMeta.getTabletId() << std::endl;
ScanContinuationAutoPtr newContinuation; ScanContinuationAutoPtr newContinuation;
TabletRangeAutoPtr tabletRange; TabletRangeAutoPtr tabletRange;
if (SuCode::SuOk != tabletMeta.keyRange(tabletRange)){ if (SuCode::SuOk != tabletMeta.keyRange(tabletRange)){
// BAD_CODE_ABORT("Bad tablet name"); BAD_CODE_ABORT("Bad tablet name");
filestr << "LSMP bad tablet name" << std::endl;
abort();
} }
/* This is necessary once we turn on splits, because multiple tablets /* This is necessary once we turn on splits, because multiple tablets
@ -580,7 +576,7 @@ scan(const TabletMetadata& tabletMeta, const ScanContinuation& continuation,
selector, /*getMetadataOnly,*/ selector, /*getMetadataOnly,*/
expiryTime, scanLimit, byteLimit); expiryTime, scanLimit, byteLimit);
filestr << "LSMP scan returns. Error = " << iter->error << std::endl; DHT_DEBUG_STREAM() << "LSMP scan returns. Error = " << iter->error;
return StorageRecordIterator(iter); return StorageRecordIterator(iter);
} }
@ -590,7 +586,7 @@ getSnapshotExporter(const TabletMetadata& tabletMeta,
const std::string& snapshotId, const std::string& snapshotId,
SnapshotExporterAutoPtr& exporter) SnapshotExporterAutoPtr& exporter)
{ {
filestr << "LSMP getSnapshotExported called" << std::endl; DHT_WARN_STREAM() << "Unimplemented: LSMP getSnapshotExported called";
/* const std::string& mySQLTableName = tabletMeta.getTabletId(); /* const std::string& mySQLTableName = tabletMeta.getTabletId();
TabletRangeAutoPtr tabletRange; TabletRangeAutoPtr tabletRange;
@ -610,7 +606,7 @@ getSnapshotImporter(const TabletMetadata& tabletMeta,
const std::string& snapshotId, const std::string& snapshotId,
SnapshotImporterAutoPtr& importer) SnapshotImporterAutoPtr& importer)
{ {
filestr << "LSMP getSnapshotImporter called" << std::endl; DHT_WARN_STREAM() << "Unimplemented: getSnapshotImporter called";
/* if (version == LSMSnapshotExporter::VERSION){ /* if (version == LSMSnapshotExporter::VERSION){
const std::string& mySQLTableName = tabletMeta.getTabletId(); const std::string& mySQLTableName = tabletMeta.getTabletId();
importer=LSMSnapshotExporter::getImporter(tabletMeta.table(), importer=LSMSnapshotExporter::getImporter(tabletMeta.table(),
@ -634,8 +630,7 @@ getIncomingCopyProgress(const TabletMetadata& metadata,
int64_t& current, int64_t& current,
int64_t& estimated) const int64_t& estimated) const
{ {
*const_cast<std::fstream*>(&filestr) << "LSMP getIncomingCopyProgress called" << std::endl; DHT_DEBUG_STREAM() << "Unimplemented: LSMP getIncomingCopyProgress called";
fprintf(stderr, "unsupported method getIncomingCopyProgrees called\n");
//This will be a problem when we have more than 1 //This will be a problem when we have more than 1
//exporter/importer type. We will have to store the //exporter/importer type. We will have to store the
@ -658,8 +653,7 @@ getOutgoingCopyProgress(const TabletMetadata& metadata,
int64_t& current, int64_t& current,
int64_t& estimated) const int64_t& estimated) const
{ {
*const_cast<std::fstream*>(&filestr) << "LSMP getOutgoingCopyProgress called" << std::endl; DHT_DEBUG_STREAM() << "Unimplemented: LSMP getOutgoingCopyProgress called";
fprintf(stderr, "unsupported method getOutgoingCopyProgrees called\n");
current = 1024*1024*1024; current = 1024*1024*1024;
estimated = 1024*1024*1024; estimated = 1024*1024*1024;
return SuCode::SuOk; return SuCode::SuOk;

View file

@ -9,9 +9,6 @@
#ifndef LSM_PSTORE_IMPL_H #ifndef LSM_PSTORE_IMPL_H
#define LSM_PSTORE_IMPL_H #define LSM_PSTORE_IMPL_H
#include <fstream>
#include <iostream>
#include "PersistentStore.h" #include "PersistentStore.h"
#include "datatuple.h" #include "datatuple.h"
//#include "LSMCoreImpl.h" //#include "LSMCoreImpl.h"
@ -25,6 +22,7 @@ class LSMPersistentStoreImpl : public PersistentStore
friend class LSMIterator; friend class LSMIterator;
friend class LSMPersistentParent; friend class LSMPersistentParent;
protected: protected:
// LSMCoreImpl& mySQLCoreImpl_; // LSMCoreImpl& mySQLCoreImpl_;
bool isOrdered_; bool isOrdered_;
unsigned char * my_strcat(const std::string& table, unsigned char * my_strcat(const std::string& table,
@ -44,7 +42,6 @@ protected:
SuCode::ResponseCode val_buf(StorageRecord &ret, SuCode::ResponseCode val_buf(StorageRecord &ret,
const unsigned char * buf, size_t buf_len); const unsigned char * buf, size_t buf_len);
public: public:
std::fstream filestr;
LSMPersistentStoreImpl(bool ordered); LSMPersistentStoreImpl(bool ordered);
virtual ~LSMPersistentStoreImpl(); virtual ~LSMPersistentStoreImpl();

View file

@ -19,4 +19,4 @@ tcpclient.cpp : ../tcpclient.cpp
ln -s ../tcpclient.cpp ln -s ../tcpclient.cpp
clean: clean:
rm -f LSMPersistentStoreImpl.o tcpclient.o libsherpalogstore.so rm -f LSMPersistentStoreImpl.o tcpclient.o libsherpalogstore.so LSMPersistentParentImpl.o