forked from PlatformLab/NanoLog
-
Notifications
You must be signed in to change notification settings - Fork 0
/
RuntimeLogger.h
530 lines (431 loc) · 19.9 KB
/
RuntimeLogger.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
/* Copyright (c) 2016-2020 Stanford University
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#ifndef RUNTIME_NANOLOG_H
#define RUNTIME_NANOLOG_H
#include <aio.h>
#include <cassert>
#include <condition_variable>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include "Config.h"
#include "Common.h"
#include "Fence.h"
#include "Log.h"
#include "NanoLog.h"
#include "Util.h"
namespace NanoLogInternal {
using namespace NanoLog;
/**
* RuntimeLogger provides runtime support to the C++ code generated by the
* Preprocessor component.
* Its main responsibilities are to manage fast thread-local storage to stage
* uncompressed log messages and manage a background thread to compress the
* log messages to an output file.
*/
class RuntimeLogger {
public:
/**
* See function below.
*/
inline void
registerInvocationSite_internal(int &logId, StaticLogInfo info) {
// TODO(syang0) Make this into a spin lock
std::lock_guard<std::mutex>
lock(nanoLogSingleton.registrationMutex);
if (logId != UNASSIGNED_LOGID)
return;
logId = static_cast<int32_t>(invocationSites.size());
invocationSites.push_back(info);
#ifdef ENABLE_DEBUG_PRINTING
printf("Registered '%s' as id=%d\r\n", info.formatString, logId);
printf("\tisParamString [%p] = ", info.isArgString);
for (int i = 0; i < info.numParams; ++i)
printf("%d ", info.isArgString[i]);
printf("\r\n");
#endif
}
/**
* Assigns a globally unique identifier to static log information and
* stages it for persistence to disk.
*
* \param info
* Static log info to associate and persist
*
* \param[in/out] logId
* Unique log identifier to be assigned. A value other than -1
* indicates that the id has already been assigned and this
* function becomes a no-op.
*/
static inline void
registerInvocationSite(StaticLogInfo info, int &logId) {
nanoLogSingleton.registerInvocationSite_internal(logId, info);
}
/**
* Allocate thread-local space for the generated C++ code to store an
* uncompressed log message, but do not make it available for compression
* yet. The caller should invoke finishAlloc() to make the space visible
* to the compression thread and this function shall not be invoked
* again until the corresponding finishAlloc() is invoked first.
*
* Note this will block of the buffer is full.
*
* \param nbytes
* number of bytes to allocate in the
*
* \return
* pointer to the allocated space
*/
static inline char *
reserveAlloc(size_t nbytes) {
if (stagingBuffer == nullptr)
nanoLogSingleton.ensureStagingBufferAllocated();
// NOLINTNEXTLINE(clang-analyzer-core.CallAndMessage)
return stagingBuffer->reserveProducerSpace(nbytes);
}
/**
* Complement to reserveAlloc, makes the bytes previously
* reserveAlloc()-ed visible to the compression/output thread.
*
* \param nbytes
* Number of bytes to make visible
*/
static inline void
finishAlloc(size_t nbytes) {
stagingBuffer->finishReservation(nbytes);
}
static std::string getStats();
static std::string getHistograms();
static void preallocate();
static void setLogFile(const char *filename);
static void setLogLevel(LogLevel logLevel);
static void sync();
static inline LogLevel getLogLevel() {
return nanoLogSingleton.currentLogLevel;
}
static inline int getCoreIdOfBackgroundThread() {
return nanoLogSingleton.coreId;
}
PRIVATE:
// Forward Declarations
class StagingBuffer;
class StagingBufferDestroyer;
// Storage for staging uncompressed log statements for compression
static __thread StagingBuffer *stagingBuffer;
// Destroys the __thread StagingBuffer upon its own destruction, which
// is synchronized with thread death
static thread_local StagingBufferDestroyer sbc;
// Singleton RuntimeLogger that manages the thread-local structures and
// background output thread.
static RuntimeLogger nanoLogSingleton;
RuntimeLogger();
~RuntimeLogger();
void compressionThreadMain();
void setLogFile_internal(const char *filename);
void waitForAIO();
/**
* Allocates thread-local structures if they weren't already allocated.
* This is used by the generated C++ code to ensure it has space to
* log uncompressed messages to and by the user if they wish to
* preallocate the data structures on thread creation.
*/
inline void
ensureStagingBufferAllocated() {
if (stagingBuffer == nullptr) {
std::unique_lock<std::mutex> guard(bufferMutex);
uint32_t bufferId = nextBufferId++;
// Unlocked for the expensive StagingBuffer allocation
guard.unlock();
stagingBuffer = new StagingBuffer(bufferId);
guard.lock();
threadBuffers.push_back(stagingBuffer);
}
}
// Globally the thread-local stagingBuffers
std::vector<StagingBuffer *> threadBuffers;
// Stores the id for the next StagingBuffer to be allocated. The ids are
// unique for this execution for each StagingBuffer allocation.
uint32_t nextBufferId = 1;
// Protects reads and writes to threadBuffers
std::mutex bufferMutex;
// Background thread that polls the various staging buffers, compresses
// the staged log messages, and outputs it to a file.
std::thread compressionThread;
// Indicates there's an operation in aioCb that should be waited on
bool hasOutstandingOperation;
// Flag signaling the compressionThread to stop running. This is
// typically only set in testing or when the application is exiting.
bool compressionThreadShouldExit;
// Marks the progress of flushing all log messages to disk after a user
// invokes the sync() API. To complete the operation, the background
// thread has to make two passes through the staging buffers and wait
// on the AIO to complete before waking up the user thread.
enum {
SYNC_REQUESTED, // User invoked a sync() operation
PERFORMING_SECOND_PASS, // Background thread is making a second pass
WAITING_ON_AIO, // Background thread is waiting on AIO
SYNC_COMPLETED // Operation complete/no requests
} syncStatus;
// Protects the condition variables below
std::mutex condMutex;
// Signal for when the compression thread should wakeup
std::condition_variable workAdded;
// Signaled when the background thread completes a sync() operation and
// the user thread should wake up.
std::condition_variable hintSyncCompleted;
// File handle for the output file; should only be opened once at the
// construction of the LogCompressor
int outputFd;
// POSIX AIO structure used to communicate async IO requests
struct aiocb aioCb;
// Used to stage the compressed log messages before passing it on to the
// POSIX AIO library.
// Dynamically allocated buffer to stage compressed log message before
// handing it over to the POSIX AIO library for output.
char *compressingBuffer;
// Dynamically allocated double buffer that is swapped with the
// compressingBuffer when the latter is passed to the POSIX AIO library.
char *outputDoubleBuffer;
// Minimum log level that RuntimeLogger will accept. Anything lower will
// be dropped.
LogLevel currentLogLevel;
// Marks the rdtsc() when the current compression thread first started
// running. A value of 0 indicates the compression thread is not running
uint64_t cycleAtThreadStart;
// Marks the rdtsc() when the last I/O operation started
uint64_t cyclesAtLastAIOStart;
// Metric: Number of cycles compression thread is doing work
uint64_t cyclesActive;
// Metric: Amount of time spent compressing the dynamic log data
uint64_t cyclesCompressing;
// Metric: Stores the distribution of StagingBuffer peek sizes in 5%
// increments relative to the full size. This distribution should show
// how well the background thread keeps up with the logging threads.
uint64_t stagingBufferPeekDist[20];
// Metric: Amount of time spent scanning the buffers for work and
// compressing events found.
uint64_t cyclesScanningAndCompressing;
// Metric: Upper bound on the amount of time spent on fsync() and disk
// writes. It is an upper bound since the code polls for the async IO
uint64_t cyclesDiskIO_upperBound;
// Metric: Number of bytes read in from the staging buffers
uint64_t totalBytesRead;
// Metric: Number of bytes written to the output file (includes padding)
uint64_t totalBytesWritten;
// Metric: Number of pad bytes written to round the file to the nearest
// 512B
uint64_t padBytesWritten;
// Metric: Number of log statements compressed and outputted.
uint64_t logsProcessed;
// Metric: Number of times an AIO write was completed.
uint32_t numAioWritesCompleted;
// Stores the last coreId that the background thread ran in.
int coreId;
// Used to control access to invocationSites
std::mutex registrationMutex;
// Maps unique identifiers to log invocation sites encountered thus far
// by the non-preprocessor version of NanoLog
std::vector<StaticLogInfo> invocationSites;
// Indicates the index of the next invocationSite that needs to be
// persisted to disk.
uint32_t nextInvocationIndexToBePersisted;
/**
* Implements a circular FIFO producer/consumer byte queue that is used
* to hold the dynamic information of a NanoLog log statement (producer)
* as it waits for compression via the NanoLog background thread
* (consumer). There exists a StagingBuffer for every thread that uses
* the NanoLog system.
*/
class StagingBuffer {
public:
/**
* Attempt to reserve contiguous space for the producer without
* making it visible to the consumer. The caller should invoke
* finishReservation() before invoking reserveProducerSpace()
* again to make the bytes reserved visible to the consumer.
*
* This mechanism is in place to allow the producer to initialize
* the contents of the reservation before exposing it to the
* consumer. This function will block behind the consumer if
* there's not enough space.
*
* \param nbytes
* Number of bytes to allocate
*
* \return
* Pointer to at least nbytes of contiguous space
*/
inline char *
reserveProducerSpace(size_t nbytes) {
++numAllocations;
// Fast in-line path
if (nbytes < minFreeSpace)
return producerPos;
// Slow allocation
return reserveSpaceInternal(nbytes);
}
/**
* Complement to reserveProducerSpace that makes nbytes starting
* from the return of reserveProducerSpace visible to the consumer.
*
* \param nbytes
* Number of bytes to expose to the consumer
*/
inline void
finishReservation(size_t nbytes) {
assert(nbytes < minFreeSpace);
assert(producerPos + nbytes <
storage + NanoLogConfig::STAGING_BUFFER_SIZE);
Fence::sfence(); // Ensures producer finishes writes before bump
minFreeSpace -= nbytes;
producerPos += nbytes;
}
char *peek(uint64_t *bytesAvailable);
/**
* Consumes the next nbytes in the StagingBuffer and frees it back
* for the producer to reuse. nbytes must be less than what is
* returned by peek().
*
* \param nbytes
* Number of bytes to return back to the producer
*/
inline void
consume(uint64_t nbytes) {
Fence::lfence(); // Make sure consumer reads finish before bump
consumerPos += nbytes;
}
/**
* Returns true if it's safe for the compression thread to delete
* the StagingBuffer and remove it from the global vector.
*
* \return
* true if its safe to delete the StagingBuffer
*/
bool
checkCanDelete() {
return shouldDeallocate && consumerPos == producerPos;
}
uint32_t getId() {
return id;
}
StagingBuffer(uint32_t bufferId)
: producerPos(storage)
, endOfRecordedSpace(storage
+ NanoLogConfig::STAGING_BUFFER_SIZE)
, minFreeSpace(NanoLogConfig::STAGING_BUFFER_SIZE)
, cyclesProducerBlocked(0)
, numTimesProducerBlocked(0)
, numAllocations(0)
, cyclesProducerBlockedDist()
, cyclesIn10Ns(PerfUtils::Cycles::fromNanoseconds(10))
, cacheLineSpacer()
, consumerPos(storage)
, shouldDeallocate(false)
, id(bufferId)
, storage() {
// Empty function, but causes the C++ runtime to instantiate the
// sbc thread_local (see documentation in function).
sbc.stagingBufferCreated();
for (size_t i = 0; i < Util::arraySize(
cyclesProducerBlockedDist); ++i)
{
cyclesProducerBlockedDist[i] = 0;
}
}
~StagingBuffer() {
}
PRIVATE:
char *reserveSpaceInternal(size_t nbytes, bool blocking = true);
// Position within storage[] where the producer may place new data
char *producerPos;
// Marks the end of valid data for the consumer. Set by the producer
// on a roll-over
char *endOfRecordedSpace;
// Lower bound on the number of bytes the producer can allocate w/o
// rolling over the producerPos or stalling behind the consumer
uint64_t minFreeSpace;
// Number of cycles producer was blocked while waiting for space to
// free up in the StagingBuffer for an allocation.
uint64_t cyclesProducerBlocked;
// Number of times the producer was blocked while waiting for space
// to free up in the StagingBuffer for an allocation
uint32_t numTimesProducerBlocked;
// Number of alloc()'s performed
uint64_t numAllocations;
// Distribution of the number of times Producer was blocked
// allocating space in 10ns increments. The last slot includes
// all times greater than the last increment.
uint32_t cyclesProducerBlockedDist[20];
// Number of Cycles in 10ns. This is used to avoid the expensive
// Cycles::toNanoseconds() call to calculate the bucket in the
// cyclesProducerBlockedDist distribution.
uint64_t cyclesIn10Ns;
// An extra cache-line to separate the variables that are primarily
// updated/read by the producer (above) from the ones by the
// consumer(below)
char cacheLineSpacer[2*Util::BYTES_PER_CACHE_LINE];
// Position within the storage buffer where the consumer will consume
// the next bytes from. This value is only updated by the consumer.
char* volatile consumerPos;
// Indicates that the thread owning this StagingBuffer has been
// destructed (i.e. no more messages will be logged to it) and thus
// should be cleaned up once the buffer has been emptied by the
// compression thread.
bool shouldDeallocate;
// Uniquely identifies this StagingBuffer for this execution. It's
// similar to ThreadId, but is only assigned to threads that NANO_LOG).
uint32_t id;
// Backing store used to implement the circular queue
char storage[NanoLogConfig::STAGING_BUFFER_SIZE];
friend RuntimeLogger;
friend StagingBufferDestroyer;
DISALLOW_COPY_AND_ASSIGN(StagingBuffer);
};
// This class is intended to be instantiated as a C++ thread_local to
// synchronize marking the thread local stagingBuffer for deletion with
// thread death.
//
// The reason why this class exists rather than wrapping the stagingBuffer
// in a unique_ptr or declaring the stagingBuffer itself to be thread_local
// is because of performance. Dereferencing the former costs 10 ns and the
// latter allocates large amounts of resources for every thread that is
// created, which is wasteful for threads that do not use the RuntimeLogger.
class StagingBufferDestroyer {
public:
// TODO(syang0) I wonder if it'll be better if stagingBuffer was
// actually a thread_local wrapper with dereference operators
// implemented.
explicit StagingBufferDestroyer() {
}
// Weird C++ hack; C++ thread_local are instantiated upon first use
// thus the StagingBuffer has to invoke this function in order
// to instantiate this object.
void stagingBufferCreated() {}
virtual ~StagingBufferDestroyer() {
if (stagingBuffer != nullptr) {
stagingBuffer->shouldDeallocate = true;
stagingBuffer = nullptr;
}
}
};
DISALLOW_COPY_AND_ASSIGN(RuntimeLogger);
}; // RuntimeLogger
}; // Namespace NanoLogInternal
// MUST appear at the very end of the RuntimeLogger.h file, right before the
// last #endif. It serves a marker for the preprocessor for where it can
// start injecting inlined, generated functions.
static const int __internal_dummy_variable_marker_for_code_injection = 0;
#endif /* RUNTIME_NANOLOG_H */