Put && Delete && Get
这篇文章主要介绍一下leveldb中Put
和Delete
的接口。而Write
接口用来批量执行Put
或Delete
操作,在同一批(即同一个WriteBatch
)中的操作,其插入的entry的SequenceNumber相同。
1 2 3 4 5 6 7 8 9 10 11
| Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { WriteBatch batch; batch.Put(key, value); return Write(opt, &batch); }
Status DB::Delete(const WriteOptions& opt, const Slice& key) { WriteBatch batch; batch.Delete(key); return Write(opt, &batch); }
|
WriteBatch
WriteBatch
是用来记录需要批处理的若干个Put
或Delete
操作的结构体。LevelDB会为同一个WriteBatch
中所有操作分配相同的SequenceNumber。WriteBatch
的结构如下图所示:
每个WriteBatch
在内存中是一个连续的字节数组,其头12个字节是WriteBatch的Header,其中前8字节是该WriteBatch的SequenceNumber,后4字节是该WriteBatch中Entry的数量。Header后面连续存放着WriteBatch中的Entry。每个Entry的头一个字节标识了该Entry的操作类型。对于Put操作的Entry,在类型标识后以LengthPrefixedSlice编码存放着key和value;对于Delete操作的Entry,在类型标识后以LengthPrefixedSlice编码存放着待删除忽的key。
WriteBatch
的用法如下:
1 2 3 4 5
| leveldb::WriteBatch batch; batch.Put(key1, value1); batch.Delete(key1); batch.Put(key2, value); s = db->Write(leveldb::WriteOptions(), &batch);
|
为了优化写性能,LevelDB中提供了写合并能力,队首线程可以将队列中多个连续的写任务合并为一个写任务。
- 若队首写任务的WriteOption中sync属性为false且待合并的任务sync属性为true,则不能被合并。如下图中假设W1、W2、W3、W4、W5的sync属性为false,W6的sync属性为true,则W1~5将被合并为一个写任务,随后开始写日志文件(AddRecord)和写内存数据库(InsertIntoMemtable)。
- 写日志和内存数据库时会释放写锁,允许其他线程向写队列中添加写任务,当写任务完成时将再次加锁,并更新当前版本的操作序列号(更新前的序列号加上本次写任务的写操作数量)。
- 当前写任务完成时,写队列中可能被其他线程插入了新的任务(W9~11),已完成的写任务将陆续出队列,最后唤醒当前队首线程(W6)进行新一轮的写任务。
我们来看相关的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { Writer w(&mutex_); w.batch = updates; w.sync = options.sync; w.done = false;
MutexLock l(&mutex_); writers_.push_back(&w); while (!w.done && &w != writers_.front()) { w.cv.Wait(); }
if (w.done) { return w.status; } Status status = MakeRoomForWrite(updates == nullptr); uint64_t last_sequence = versions_->LastSequence(); Writer* last_writer = &w; if (status.ok() && updates != nullptr) { WriteBatch* write_batch = BuildBatchGroup(&last_writer); WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); last_sequence += WriteBatchInternal::Count(write_batch);
{ mutex_.Unlock(); status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); bool sync_error = false; if (status.ok() && options.sync) { status = logfile_->Sync(); if (!status.ok()) { sync_error = true; } }
if (status.ok()) { status = WriteBatchInternal::InsertInto(write_batch, mem_); } mutex_.Lock(); } versions_->SetLastSequence(last_sequence); }
while (true) { Writer* ready = writers_.front(); writers_.pop_front(); if (ready != &w) { ready->status = status; ready->done = true; ready->cv.Signal(); } if (ready == last_writer) break; }
if (!writers_.empty()) { writers_.front()->cv.Signal(); }
return status; }
|
Get操作
首先这里为mutex_
进行加锁,为什么需要MutexLock
类?
这里利用了析构函数对mutex_进行解锁的,退出这个函数的作用域就会调用MutexLock析构函数,不会出现忘记解锁导致的问题。这种机制也叫作RALL(Resource Acquisition Is Initialization)。
然后从options
里面获取快照信息。
然后获取mem_
、imm_
和current_
,将依次在这里面进行查询。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| Status s; MutexLock l(&mutex_); SequenceNumber snapshot; if (options.snapshot != nullptr) { snapshot = static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number(); } else { snapshot = versions_->LastSequence(); }
MemTable* mem = mem_; MemTable* imm = imm_; Version* current = versions_->current(); mem->Ref(); if (imm != nullptr) imm->Ref(); current->Ref();
|
然后构造LookupKey
,在三个层次中进行查询就可以了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| bool have_stat_update = false; Version::GetStats stats;
{ mutex_.Unlock(); LookupKey lkey(key, snapshot); if (mem->Get(lkey, value, &s)) { } else if (imm != nullptr && imm->Get(lkey, value, &s)) { } else { s = current->Get(options, lkey, value, &stats); have_stat_update = true; } mutex_.Lock(); }
if (have_stat_update && current->UpdateStats(stats)) { MaybeScheduleCompaction(); } mem->Unref(); if (imm != nullptr) imm->Unref(); current->Unref(); return s;
|