-
Notifications
You must be signed in to change notification settings - Fork 8
/
tx_entry.h
245 lines (218 loc) · 8.51 KB
/
tx_entry.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
#pragma once
#include "alloc/alloc.h"
#include "block/tx.h"
#include "idx.h"
#include "mem_table.h"
#include "utils/timer.h"
namespace madfs::dram {
/**
* @brief A TxCursor is a pointer to a transaction entry. It contains a
* TxEntryIdx and a pointer to the block containing the entry. It is 16 bytes in
* size and can be passed around by value in the registers.
*/
struct TxCursor {
TxEntryIdx idx;
union {
pmem::TxBlock* block;
pmem::MetaBlock* meta;
void* addr;
};
TxCursor() : idx(), addr(nullptr) {}
TxCursor(TxEntryIdx idx, void* addr) : idx(idx), addr(addr) {}
static TxCursor from_meta(pmem::MetaBlock* meta) { return {{}, meta}; }
/**
* @return the tx entry pointed to by this cursor
*/
[[nodiscard]] pmem::TxEntry get_entry() const {
TimerGuard<Event::TX_ENTRY_LOAD> timer_guard;
assert(addr != nullptr);
std::atomic<pmem::TxEntry>* entries =
idx.is_inline() ? meta->tx_entries : block->tx_entries;
assert(idx.local_idx < idx.get_capacity());
return entries[idx.local_idx].load(std::memory_order_acquire);
}
/**
* If the given cursor is in an overflow state, update it if allowed.
*
* @param[in] allocator allocator to allocate new blocks
* @param[in] mem_table used to find the memory address of the next block
* @param[out] into_new_block if not nullptr, return whether the cursor is
* advanced into a new tx block (i.e. previously, it is in an overflow state)
* @return true if the idx is not in overflow state; false otherwise
*/
bool handle_overflow(MemTable* mem_table, Allocator* allocator = nullptr,
bool* into_new_block = nullptr) {
const bool is_inline = idx.is_inline();
uint16_t capacity = idx.get_capacity();
if (unlikely(idx.local_idx >= capacity)) {
if (into_new_block) *into_new_block = true;
LogicalBlockIdx block_idx = idx.is_inline() ? meta->get_next_tx_block()
: block->get_next_tx_block();
if (block_idx == 0) {
if (!allocator) return false;
const auto& [new_block_idx, new_block] =
is_inline ? allocator->tx_block.alloc_next(meta)
: allocator->tx_block.alloc_next(block);
idx.block_idx = new_block_idx;
block = new_block;
idx.local_idx -= capacity;
} else {
idx.block_idx = block_idx;
idx.local_idx -= capacity;
block = &mem_table->lidx_to_addr_rw(idx.block_idx)->tx_block;
}
} else {
if (into_new_block) *into_new_block = false;
}
return true;
}
/**
* Advance cursor to the next transaction entry
*
* @param[in] cursor the cursor to advance
* @param[in] allocator if given, allocate new blocks when reaching the end of
* a block
* @param[out] into_new_block if not nullptr, return whether the cursor is
* advanced into a new tx block
*
* @return true on success; false when reaches the end of a block and
* allocator is not given. The advance would happen anyway but in the case
* of false, it is in a overflow state
*/
bool advance(MemTable* mem_table, Allocator* allocator = nullptr,
bool* into_new_block = nullptr) {
idx.local_idx++;
return handle_overflow(mem_table, allocator, into_new_block);
}
/**
* Try to commit a tx entry to the current cursor
*
* @param entry entry to commit
* @param mem_table used to find the memory address of the next block and the
* meta block
* @param allocator used to allocate new tx blocks on overflow
* @return empty entry on success; conflict entry otherwise
*/
pmem::TxEntry try_commit(pmem::TxEntry entry, MemTable* mem_table,
Allocator* allocator) {
this->handle_overflow(mem_table, allocator);
if (pmem::TxEntry::need_flush(this->idx.local_idx)) {
pmem::MetaBlock* meta = mem_table->get_meta();
TxCursor::flush_up_to(mem_table, meta, *this);
meta->set_flushed_tx_tail(this->idx);
}
return this->try_append(entry);
}
/**
* Flush from the tail recorded in the meta block to `end`
* @param mem_table used to find the memory address of the next block
* @param meta the meta block used to find the flushed tx tail
* @param end the end of the range to flush
*/
static void flush_up_to(MemTable* mem_table, pmem::MetaBlock* meta,
TxCursor end) {
TxEntryIdx begin_idx = meta->get_flushed_tx_tail();
void* addr =
begin_idx.is_inline()
? static_cast<void*>(meta)
: mem_table->lidx_to_addr_rw(begin_idx.block_idx)->data_rw();
flush_range(mem_table, TxCursor(begin_idx, addr), end);
}
private:
/**
* try to append a tx entry to the location pointer by the cursor; fail if the
* slot is taken (likely due to a race condition)
*
* @return if success, return 0; otherwise, return the entry on the slot
*/
[[nodiscard]] pmem::TxEntry try_append(pmem::TxEntry entry) const {
TimerGuard<Event::TX_ENTRY_STORE> timer_guard;
assert(addr != nullptr);
std::atomic<pmem::TxEntry>* entries =
idx.is_inline() ? meta->tx_entries : block->tx_entries;
pmem::TxEntry expected = 0;
entries[idx.local_idx].compare_exchange_strong(
expected, entry, std::memory_order_acq_rel, std::memory_order_acquire);
// if CAS fails, `expected` will be stored the value in entries[idx]
// if success, it will return 0
return expected;
}
static void flush_range(MemTable* mem_table, TxCursor begin, TxCursor end) {
if (begin >= end) return;
pmem::TxBlock* tx_block_begin;
// handle special case of inline tx
if (begin.idx.block_idx == 0) {
if (end.idx.block_idx == 0) {
begin.meta->flush_tx_entries(begin.idx.local_idx, end.idx.local_idx);
goto done;
}
begin.meta->flush_tx_block(begin.idx.local_idx);
// now the next block is the "new begin"
begin.idx = {begin.meta->get_next_tx_block(), 0};
}
while (begin.idx.block_idx != end.idx.block_idx) {
tx_block_begin =
&mem_table->lidx_to_addr_rw(begin.idx.block_idx)->tx_block;
tx_block_begin->flush_tx_block(begin.idx.local_idx);
begin.idx = {tx_block_begin->get_next_tx_block(), 0};
// special case: tx_idx_end is the first entry of the next block, which
// means we only need to flush the current block and no need to
// dereference to get the last block
}
if (begin.idx.local_idx == end.idx.local_idx) goto done;
end.block->flush_tx_entries(begin.idx.local_idx, end.idx.local_idx);
done:
fence();
}
/**
* Move along the linked list of TxBlock and find the tail. The returned
* tail may not be up-to-date due to race condition. No new blocks will be
* allocated. If the end of TxBlock is reached, just return NUM_TX_ENTRY as
* the TxLocalIdx.
*/
static TxCursor find_tail(MemTable* mem_table, TxCursor cursor) {
LogicalBlockIdx next_block_idx;
if (cursor.idx.block_idx == 0) { // search from meta
if ((next_block_idx = cursor.meta->get_next_tx_block()) != 0) {
cursor.idx.local_idx = cursor.meta->find_tail(cursor.idx.local_idx);
return cursor;
} else {
cursor.idx.block_idx = next_block_idx;
cursor.idx.local_idx = 0;
cursor.block =
&mem_table->lidx_to_addr_rw(cursor.idx.block_idx)->tx_block;
}
}
while ((next_block_idx = cursor.block->get_next_tx_block()) != 0) {
cursor.idx.block_idx = next_block_idx;
cursor.block = &(mem_table->lidx_to_addr_rw(next_block_idx)->tx_block);
}
cursor.idx.local_idx = cursor.block->find_tail(cursor.idx.local_idx);
return cursor;
}
public:
friend bool operator==(const TxCursor& lhs, const TxCursor& rhs) {
return lhs.idx == rhs.idx && lhs.block == rhs.block;
}
friend bool operator!=(const TxCursor& lhs, const TxCursor& rhs) {
return !(rhs == lhs);
}
friend bool operator<(const TxCursor& lhs, const TxCursor& rhs) {
if (lhs.idx.block_idx == rhs.idx.block_idx)
return lhs.idx.local_idx < rhs.idx.local_idx;
if (lhs.idx.block_idx == 0) return true;
if (rhs.idx.block_idx == 0) return false;
return lhs.block->get_tx_seq() < rhs.block->get_tx_seq();
}
friend bool operator>(const TxCursor& lhs, const TxCursor& rhs) {
return rhs < lhs;
}
friend bool operator<=(const TxCursor& lhs, const TxCursor& rhs) {
return !(rhs < lhs);
}
friend bool operator>=(const TxCursor& lhs, const TxCursor& rhs) {
return !(lhs < rhs);
}
};
static_assert(sizeof(TxCursor) == 16);
} // namespace madfs::dram