diff --git a/sherpa/LSMPersistentStoreImpl.cc b/sherpa/LSMPersistentStoreImpl.cc index fc1d0c0..aff5ecd 100644 --- a/sherpa/LSMPersistentStoreImpl.cc +++ b/sherpa/LSMPersistentStoreImpl.cc @@ -9,6 +9,7 @@ #include "ScanContinuation.h" #include "OrderedScanContinuation.h" #include "TabletRange.h" +#include "SuLimits.h" #include "dht/UtilityBuffer.h" //#include @@ -347,8 +348,14 @@ addEmptyTablet(TabletMetadata& tabletMeta) filestr << "LSMP addEmptyTablet called" << std::endl; // This is a no-op; we'll simply prepend the tablet string to each record. { + // Is table name too long? + if(tabletMeta.table().length() > SuLimits::MAX_TABLE_NAME_LENGTH_DB) { + return SuCode::PStoreIOFailed; + } + const std::string& mySQLTableName = tabletMeta.getTabletId(); + if (mySQLTableName!=""){ filestr << "Tablet " << mySQLTableName << " already exists!" << std::endl; return SuCode::PStoreTabletAlreadyExists; @@ -425,6 +432,9 @@ SuCode::ResponseCode LSMPersistentStoreImpl:: get(const TabletMetadata& tabletMeta, StorageRecord& recordData) { filestr << "LSMP get called" << tabletMeta.getTabletId() << ":" << recordData.recordKey()<< std::endl; + if(recordData.recordKey().name().length() > (isOrdered_ ? SuLimits::MAX_ORDERED_RECORD_NAME_LENGTH : SuLimits::MAX_RECORD_NAME_LENGTH)) { + return SuCode::PStoreIOFailed; + } size_t buflen; unsigned char * buf = buf_key(tabletMeta, recordData.recordKey(), &buflen); datatuple * key_tup = datatuple::create(buf, buflen); @@ -449,6 +459,10 @@ SuCode::ResponseCode LSMPersistentStoreImpl:: update(const TabletMetadata& tabletMeta, const StorageRecord& updateData) { filestr << "LSMP update called" << std::endl; + if(updateData.recordKey().name().length() > (isOrdered_ ? SuLimits::MAX_ORDERED_RECORD_NAME_LENGTH : SuLimits::MAX_RECORD_NAME_LENGTH)) { + return SuCode::PStoreIOFailed; + } + SuCode::ResponseCode ret; { /// XXX hack. Copy of get() implementation, without the memcpy. size_t buflen; @@ -477,6 +491,9 @@ insert(const TabletMetadata& tabletMeta, const StorageRecord& insertData) { filestr << "LSMP insert called" << tabletMeta.getTabletId() << ":" << insertData.recordKey()<< std::endl; size_t keybuflen, valbuflen; + if(insertData.recordKey().name().length() > (isOrdered_ ? SuLimits::MAX_ORDERED_RECORD_NAME_LENGTH : SuLimits::MAX_RECORD_NAME_LENGTH)) { + return SuCode::PStoreIOFailed; + } unsigned char * keybuf = buf_key(tabletMeta, insertData.recordKey(), &keybuflen); filestr << "keybuf = " << keybuf << " (and perhaps a null)" << std::endl; unsigned char * valbuf = buf_val(insertData, &valbuflen); @@ -504,13 +521,24 @@ SuCode::ResponseCode LSMPersistentStoreImpl:: remove(const TabletMetadata& tabletMeta, const RecordKey& recordName) { filestr << "LSMP remove called" << std::endl; - size_t buflen; - unsigned char * buf = buf_key(tabletMeta, recordName, &buflen); - datatuple * key_ins = datatuple::create(buf, buflen); - datatuple * result = logstore_client_op(l_, OP_INSERT, key_ins); - datatuple::freetuple(key_ins); - free(buf); - return result ? SuCode::SuOk : SuCode::PStoreUnexpectedError; + if(recordName.name().length() > (isOrdered_ ? SuLimits::MAX_ORDERED_RECORD_NAME_LENGTH : SuLimits::MAX_RECORD_NAME_LENGTH)) { + return SuCode::PStoreIOFailed; + } + + StorageRecord tmp(recordName.name()); + SuCode::ResponseCode rc = get(tabletMeta, tmp); + if(SuCode::SuOk == rc) { + size_t buflen; + unsigned char * buf = buf_key(tabletMeta, recordName, &buflen); + datatuple * key_ins = datatuple::create(buf, buflen); + datatuple * result = logstore_client_op(l_, OP_INSERT, key_ins); + datatuple::freetuple(key_ins); + free(buf); + return result ? SuCode::SuOk : SuCode::PStoreUnexpectedError; + } else { + filestr << "LSMP remove: record not found, or error" << std::endl; + return rc; + } } bool LSMPersistentStoreImpl::ping() {