forked from ResEnv/chain-api
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpostgres_to_influx.py
66 lines (60 loc) · 2.42 KB
/
postgres_to_influx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from __future__ import print_function
from chain.core.models import ScalarData
from django.utils import timezone
from django.db import models
from datetime import datetime
from chain.influx_client import InfluxClient, HTTP_STATUS_SUCCESSFUL_WRITE
from chain.core.resources import influx_client
from sys import stdout
# needs to be run from the manage.py shell context. Entry point is
# `migrate_data`
BATCH_SIZE = 10000
FIRST_TIMESTAMP = datetime.utcfromtimestamp(
float(1481811788574987776)/1e9).replace(tzinfo=timezone.utc)
def migrate_data(offset, limit=float('inf')):
'''Returns objects between offset and offset+limit'''
#queryset = ScalarData.objects.filter(
# timestamp__lt=FIRST_TIMESTAMP.isoformat())
#print('Calculating min and max IDs...')
#min_max = queryset.aggregate(min=models.Min('id'), max=models.Max('id'))
#min_id = min_max['min']
#max_id = min_max['max']
min_id = 0
max_id = 1068348868
print('Got min ID {0} and max ID {0}'.format(min_id, max_id))
moved = 0
for i in range(min_id, max_id+1, BATCH_SIZE):
print('Start moving objects[{0}:{1}]...'.format(
i, i+BATCH_SIZE), end='')
stdout.flush()
moved_count = post_points(ScalarData.objects.filter(id__range=(i, i+BATCH_SIZE-1)))
print('Moved {0} objects'.format(moved_count))
stdout.flush()
moved += moved_count
if moved >= limit:
break
def post_points(queryset):
"""Performs the Postgres query and posts the data to influx. Returns the
number of data points copied"""
data = ''
datacount = 0
for point in queryset:
timestamp = InfluxClient.convert_timestamp(point.timestamp)
data += '{0},sensor_id={1} value={2} {3}\n'.format(
influx_client._measurement,
point.sensor_id,
point.value,
timestamp)
datacount += 1
response = influx_client.request('POST',
influx_client._url + '/write',
{'db': influx_client._database},
data)
# print the timestamp of the last point so we get some sense of where we
# are
print("[{0}] ".format(point.timestamp), end='')
stdout.flush()
if response.status_code != HTTP_STATUS_SUCCESSFUL_WRITE:
raise RuntimeError("Influx returned status {0}".format(
response.status_code))
return datacount