From 2527b32a20cd2784e251d32903e485cd5547adde Mon Sep 17 00:00:00 2001 From: Nathan Adams Date: Sun, 17 Jul 2022 21:15:58 +0200 Subject: [PATCH 1/4] Support InfluxDB 2.x as addition to 1.8 --- data/varken.example.ini | 1 + requirements.txt | 1 + varken/dbmanager.py | 76 ++++++++++++++++++++++++++++++++--------- varken/iniparser.py | 4 ++- varken/structures.py | 1 + varken/tautulli.py | 4 +-- 6 files changed, 67 insertions(+), 20 deletions(-) diff --git a/data/varken.example.ini b/data/varken.example.ini index b32eab6d..a63285c3 100644 --- a/data/varken.example.ini +++ b/data/varken.example.ini @@ -16,6 +16,7 @@ ssl = false verify_ssl = false username = root password = root +org = - [tautulli-1] url = tautulli.domain.tld:8181 diff --git a/requirements.txt b/requirements.txt index 523e4279..4477bd88 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ requests==2.25.1 geoip2==2.9.0 influxdb==5.2.0 +influxdb-client==1.30.0 schedule==0.6.0 distro==1.4.0 urllib3==1.26.5 diff --git a/varken/dbmanager.py b/varken/dbmanager.py index c832fdf3..864d222f 100644 --- a/varken/dbmanager.py +++ b/varken/dbmanager.py @@ -1,45 +1,87 @@ +import re from sys import exit from logging import getLogger -from influxdb import InfluxDBClient -from requests.exceptions import ConnectionError -from influxdb.exceptions import InfluxDBServerError +from influxdb_client import InfluxDBClient, BucketRetentionRules +from influxdb_client.client.write_api import SYNCHRONOUS +from influxdb_client.client.exceptions import InfluxDBError +from urllib3.exceptions import NewConnectionError class DBManager(object): def __init__(self, server): self.server = server self.logger = getLogger() + self.bucket = "varken" + if self.server.url == "influxdb.domain.tld": self.logger.critical("You have not configured your varken.ini. Please read Wiki page for configuration") exit() - self.influx = InfluxDBClient(host=self.server.url, port=self.server.port, username=self.server.username, - password=self.server.password, ssl=self.server.ssl, database='varken', - verify_ssl=self.server.verify_ssl) + + url = self.server.url + if 'http' not in url: + scheme = 'http' + if self.server.ssl: + scheme = 'https' + url = "{}://{}:{}".format(scheme, self.server.url, self.server.port) + token = f'{self.server.username}:{self.server.password}' + + self.influx = InfluxDBClient(url=url, token=token, + verify_ssl=self.server.verify_ssl, org=self.server.org) + try: - version = self.influx.request('ping', expected_response_code=204).headers['X-Influxdb-Version'] + version = self.influx.version() self.logger.info('Influxdb version: %s', version) - except ConnectionError: - self.logger.critical("Error testing connection to InfluxDB. Please check your url/hostname") + match = re.match(r'(\d+)\.', version) + if match: + self.version = int(match[1]) + self.logger.info("Using InfluxDB API v%s", self.version) + else: + self.logger.critical("Unknown influxdb version") + exit(1) + except NewConnectionError: + self.logger.critical("Error getting InfluxDB version number. Please check your url/hostname are valid") exit(1) - databases = [db['name'] for db in self.influx.get_list_database()] + if self.version >= 2: + self.create_v2_bucket() + else: + self.create_v1_database() - if 'varken' not in databases: + def create_v2_bucket(self): + if not self.influx.buckets_api().find_bucket_by_name(self.bucket): + self.logger.info("Creating varken bucket") + + retention = BucketRetentionRules(type="expire", every_seconds=60 * 60 * 24 * 30, + shard_group_duration_seconds=60 * 60) + self.influx.buckets_api().create_bucket(bucket_name=self.bucket, + retention_rules=retention) + + def create_v1_database(self): + from influxdb import InfluxDBClient + client = InfluxDBClient(host=self.server.url, port=self.server.port, username=self.server.username, + password=self.server.password, ssl=self.server.ssl, database=self.bucket, + verify_ssl=self.server.verify_ssl) + databases = [db['name'] for db in client.get_list_database()] + + if self.bucket not in databases: self.logger.info("Creating varken database") - self.influx.create_database('varken') + client.create_database(self.bucket) retention_policies = [policy['name'] for policy in - self.influx.get_list_retention_policies(database='varken')] + client.get_list_retention_policies(database=self.bucket)] if 'varken 30d-1h' not in retention_policies: self.logger.info("Creating varken retention policy (30d-1h)") - self.influx.create_retention_policy(name='varken 30d-1h', duration='30d', replication='1', - database='varken', default=True, shard_duration='1h') + client.create_retention_policy(name='varken 30d-1h', duration='30d', replication='1', + database=self.bucket, default=True, shard_duration='1h') + + self.bucket = f'{self.bucket}/varken 30d-1h' def write_points(self, data): d = data self.logger.debug('Writing Data to InfluxDB %s', d) + write_api = self.influx.write_api(write_options=SYNCHRONOUS) try: - self.influx.write_points(d) - except (InfluxDBServerError, ConnectionError) as e: + write_api.write(bucket=self.bucket, record=data) + except (InfluxDBError, NewConnectionError) as e: self.logger.error('Error writing data to influxdb. Dropping this set of data. ' 'Check your database! Error: %s', e) diff --git a/varken/iniparser.py b/varken/iniparser.py index bcb3b37d..3eca865f 100644 --- a/varken/iniparser.py +++ b/varken/iniparser.py @@ -154,13 +154,15 @@ def parse_opts(self, read_file=False): username = env.get('VRKN_INFLUXDB_USERNAME', self.config.get('influxdb', 'username')) password = env.get('VRKN_INFLUXDB_PASSWORD', self.config.get('influxdb', 'password')) + + org = env.get('VRKN_INFLUXDB_ORG', self.config.get('influxdb', 'org')) except NoOptionError as e: self.logger.error('Missing key in %s. Error: %s', "influxdb", e) self.rectify_ini() return self.influx_server = InfluxServer(url=url, port=port, username=username, password=password, ssl=ssl, - verify_ssl=verify_ssl) + verify_ssl=verify_ssl, org=org) # Check for all enabled services for service in self.services: diff --git a/varken/structures.py b/varken/structures.py index e3ee0940..2ec78ff9 100644 --- a/varken/structures.py +++ b/varken/structures.py @@ -18,6 +18,7 @@ class InfluxServer(NamedTuple): url: str = 'localhost' username: str = 'root' verify_ssl: bool = False + org: str = '-' class SonarrServer(NamedTuple): diff --git a/varken/tautulli.py b/varken/tautulli.py index 746685fd..371ece45 100644 --- a/varken/tautulli.py +++ b/varken/tautulli.py @@ -2,7 +2,7 @@ from requests import Session, Request from geoip2.errors import AddressNotFoundError from datetime import datetime, timezone, date, timedelta -from influxdb.exceptions import InfluxDBClientError +from influxdb_client.client.exceptions import InfluxDBError from varken.structures import TautulliStream from varken.helpers import hashit, connection_handler, itemgetter_with_default @@ -363,7 +363,7 @@ def get_historical(self, days=30): ) try: self.dbmanager.write_points(influx_payload) - except InfluxDBClientError as e: + except InfluxDBError as e: if "beyond retention policy" in str(e): self.logger.debug('Only imported 30 days of data per retention policy') else: From 1e71ec56f3d6dad0cced8e759b364d0516f2db6f Mon Sep 17 00:00:00 2001 From: Nathan Adams Date: Sun, 17 Jul 2022 21:17:28 +0200 Subject: [PATCH 2/4] Document that influxdb 2.x is supported --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index fe95bca1..f753956b 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ ecosystem into InfluxDB using Grafana for a frontend Requirements: * [Python 3.6.7+](https://www.python.org/downloads/release/python-367/) * [Python3-pip](https://pip.pypa.io/en/stable/installing/) -* [InfluxDB 1.8.x](https://www.influxdata.com/) +* [InfluxDB 1.8.x or 2.x](https://www.influxdata.com/) * [Grafana](https://grafana.com/)

@@ -50,7 +50,7 @@ Please read [Asking for Support](https://wiki.cajun.pro/books/varken/chapter/ask ### InfluxDB [InfluxDB Installation Documentation](https://wiki.cajun.pro/books/varken/page/influxdb-d1f) -Note: Only v1.8.x is currently supported. +Note: Only v1.8.x or v2.x are supported. Influxdb is required but not packaged as part of Varken. Varken will create its database on its own. If you choose to give varken user permissions that From aa9f2594d920ee5f9e1a9a1a11ab83e266afaedd Mon Sep 17 00:00:00 2001 From: Nathan Adams Date: Sun, 17 Jul 2022 21:33:42 +0200 Subject: [PATCH 3/4] Include influxdb username/password for v2 server support --- varken/dbmanager.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/varken/dbmanager.py b/varken/dbmanager.py index 864d222f..72986b40 100644 --- a/varken/dbmanager.py +++ b/varken/dbmanager.py @@ -43,6 +43,10 @@ def __init__(self, server): exit(1) if self.version >= 2: + # If we pass username/password to a v1 server, it breaks :( + self.influx = InfluxDBClient(url=url, username=self.server.username, + password=self.server.password, + verify_ssl=self.server.verify_ssl, org=self.server.org) self.create_v2_bucket() else: self.create_v1_database() From 052be06e45636b0af019902b858cd322b269e509 Mon Sep 17 00:00:00 2001 From: Nathan Adams Date: Sun, 17 Jul 2022 21:54:39 +0200 Subject: [PATCH 4/4] Support an optional v prefix for influxdb version strings --- varken/dbmanager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/varken/dbmanager.py b/varken/dbmanager.py index 72986b40..8250ce3c 100644 --- a/varken/dbmanager.py +++ b/varken/dbmanager.py @@ -31,7 +31,7 @@ def __init__(self, server): try: version = self.influx.version() self.logger.info('Influxdb version: %s', version) - match = re.match(r'(\d+)\.', version) + match = re.match(r'v?(\d+)\.', version) if match: self.version = int(match[1]) self.logger.info("Using InfluxDB API v%s", self.version)