-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathpg_lz4.c
123 lines (98 loc) · 2.57 KB
/
pg_lz4.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#include "postgres.h"
#include "fmgr.h"
#include "lz4.h"
#include "access/cmapi.h"
#include "commands/defrem.h"
#include "nodes/parsenodes.h"
#include "utils/builtins.h"
#include "utils/datum.h"
PG_MODULE_MAGIC;
typedef struct
{
int acceleration;
} lz4_option;
PG_FUNCTION_INFO_V1( lz4_handler );
static void
lz4_check(Form_pg_attribute att, List *options)
{
ListCell *lc;
foreach (lc, options)
{
DefElem *def = (DefElem *) lfirst(lc);
if (strcmp(def->defname, "acceleration") == 0)
{
int accel = pg_atoi(defGetString(def), 4, 0);
if (accel < 1)
elog(WARNING, "Acceleration value <= 0 will be replaced by 1 (default)");
}
else
elog(ERROR, "Unknown option '%s'", def->defname);
}
}
static void *
lz4_initstate(Oid acoid, List *options)
{
ListCell *lc;
lz4_option *opt = (lz4_option *) palloc(sizeof(lz4_option));
/* default acceleration */
opt->acceleration = 1;
/* iterate through user options */
foreach (lc, options)
{
DefElem *def = (DefElem *) lfirst(lc);
if (strcmp(def->defname, "acceleration") == 0)
opt->acceleration = pg_atoi(defGetString(def), 4, 0);
else
elog(ERROR, "Unknown option '%s'", def->defname);
}
return (void *) opt;
}
static bytea *
lz4_compress(CompressionAmOptions *cmoptions, const bytea *value)
{
lz4_option *opt = (lz4_option *) cmoptions->acstate;
int src_len = (Size) VARSIZE_ANY_EXHDR(value);
int dst_len;
int len;
bytea *ret;
dst_len = LZ4_compressBound(src_len);
ret = (bytea *) palloc(dst_len + VARHDRSZ_CUSTOM_COMPRESSED);
len = LZ4_compress_fast((char *) VARDATA_ANY(value),
(char *) ret + VARHDRSZ_CUSTOM_COMPRESSED,
src_len, dst_len,
opt->acceleration);
if (len >= 0)
{
SET_VARSIZE_COMPRESSED(ret, len + VARHDRSZ_CUSTOM_COMPRESSED);
return ret;
}
pfree(ret);
return NULL;
}
static bytea *
lz4_decompress(CompressionAmOptions *cmoptions, const bytea *value)
{
int src_len = VARSIZE_ANY(value);
int dst_len = VARRAWSIZE_4B_C(value);
int len;
bytea *ret;
ret = (bytea *) palloc(dst_len + VARHDRSZ);
SET_VARSIZE(ret, dst_len + VARHDRSZ);
len = LZ4_decompress_safe((char *) value + VARHDRSZ_CUSTOM_COMPRESSED,
(char *) VARDATA(ret),
src_len - VARHDRSZ_CUSTOM_COMPRESSED,
dst_len);
if (len != dst_len)
elog(ERROR, "Decompression error");
return ret;
}
Datum
lz4_handler(PG_FUNCTION_ARGS)
{
CompressionAmRoutine *routine = makeNode(CompressionAmRoutine);
routine->cmcheck = lz4_check;
routine->cminitstate = lz4_initstate;
routine->cmcompress = lz4_compress;
routine->cmdecompress = lz4_decompress;
PG_RETURN_POINTER(routine);
}