-
Notifications
You must be signed in to change notification settings - Fork 0
/
modified-restx.py
1899 lines (1576 loc) · 77.6 KB
/
modified-restx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#
# Copyright (c) 2009-2015 Tom Keffer <[email protected]>
#
# See the file LICENSE.txt for your full rights.
#
"""Publish weather data to RESTful sites such as the Weather Underground.
GENERAL ARCHITECTURE
Each protocol uses two classes:
o A weewx service, that runs in the main thread. Call this the
"controlling object"
o A separate "threading" class that runs in its own thread. Call this the
"posting object".
Communication between the two is via an instance of Queue.Queue. New loop
packets or archive records are put into the queue by the controlling object
and received by the posting object. Details below.
The controlling object should inherit from StdRESTful. The controlling object
is responsible for unpacking any configuration information from weewx.conf, and
supplying any defaults. It sets up the queue. It arranges for any new LOOP or
archive records to be put in the queue. It then launches the thread for the
posting object.
When a new LOOP or record arrives, the controlling object puts it in the queue,
to be received by the posting object. The controlling object can tell the
posting object to terminate by putting a 'None' in the queue.
The posting object should inherit from class RESTThread. It monitors the queue
and blocks until a new record arrives.
The base class RESTThread has a lot of functionality, so specializing classes
should only have to implement a few functions. In particular,
- format_url(self, record). This function takes a record dictionary as an
argument. It is responsible for formatting it as an appropriate URL.
For example, the station registry's version emits strings such as
http://weewx.com/register/register.cgi?weewx_info=2.6.0a5&python_info= ...
- skip_this_post(self, time_ts). If this function returns True, then the
post will be skipped. Otherwise, it is done. The default version does two
checks. First, it sees how old the record is. If it is older than the value
'stale', then the post is skipped. Second, it will not allow posts more
often than 'post_interval'. Both of these can be set in the constructor of
RESTThread.
- post_request(self, request). This function takes a urllib2.Request object
and is responsible for performing the HTTP GET or POST. The default version
simply uses urllib2.urlopen(request) and returns the result. If the post
could raise an unusual exception, override this function and catch the
exception. See the WOWThread implementation for an example.
- check_response(). After an HTTP request gets posted, the webserver sends
back a "response." This response may contain clues as to whether the post
worked. By overriding check_response() you can look for these clues. For
example, the station registry checks all lines in the response, looking for
any that start with the string "FAIL". If it finds one, it raises a
FailedPost exception, signaling that the post did not work.
In unusual cases, you might also have to implement the following:
- process_record(). The default version is for HTTP GET posts, but if you wish
to do a POST or use a socket, you may need to provide a specialized version.
See the CWOP version, CWOPThread.process_record(), for an example that
uses sockets.
"""
from __future__ import with_statement
import Queue
import datetime
import hashlib
import httplib
import platform
import re
import socket
import sys
import syslog
import threading
import time
import urllib
import urllib2
import weedb
import weeutil.weeutil
import weewx.engine
from weeutil.weeutil import to_int, to_float, to_bool, timestamp_to_string, search_up, \
accumulateLeaves, to_sorted_string
import weewx.manager
import weewx.units
class FailedPost(IOError):
"""Raised when a post fails after trying the max number of allowed times"""
class AbortedPost(StandardError):
"""Raised when a post is aborted by the client."""
class BadLogin(StandardError):
"""Raised when login information is bad or missing."""
class ConnectError(IOError):
"""Raised when unable to get a socket connection."""
class SendError(IOError):
"""Raised when unable to send through a socket."""
# ==============================================================================
# Abstract base classes
# ==============================================================================
class StdRESTful(weewx.engine.StdService):
"""Abstract base class for RESTful weewx services.
Offers a few common bits of functionality."""
def shutDown(self):
"""Shut down any threads"""
if hasattr(self, 'loop_queue') and hasattr(self, 'loop_thread'):
StdRESTful.shutDown_thread(self.loop_queue, self.loop_thread)
if hasattr(self, 'archive_queue') and hasattr(self, 'archive_thread'):
StdRESTful.shutDown_thread(self.archive_queue, self.archive_thread)
@staticmethod
def shutDown_thread(q, t):
"""Function to shut down a thread."""
if q and t.isAlive():
# Put a None in the queue to signal the thread to shutdown
q.put(None)
# Wait up to 20 seconds for the thread to exit:
t.join(20.0)
if t.isAlive():
syslog.syslog(syslog.LOG_ERR,
"restx: Unable to shut down %s thread" % t.name)
else:
syslog.syslog(syslog.LOG_DEBUG,
"restx: Shut down %s thread." % t.name)
# For backwards compatibility with early v2.6 alphas. In particular, the WeatherCloud uploader depends on it.
StdRESTbase = StdRESTful
class RESTThread(threading.Thread):
"""Abstract base class for RESTful protocol threads.
Offers a few bits of common functionality."""
def __init__(self, queue, protocol_name,
essentials={},
manager_dict=None,
post_interval=None, max_backlog=sys.maxint, stale=None,
log_success=True, log_failure=True,
timeout=10, max_tries=3, retry_wait=5, retry_login=3600,
softwaretype="weewx-%s" % weewx.__version__,
skip_upload=False):
"""Initializer for the class RESTThread
Required parameters:
queue: An instance of Queue.Queue where the records will appear.
protocol_name: A string holding the name of the protocol.
Optional parameters:
essentials: A dictionary that holds observation types that must
not be None for the post to go ahead.
manager_dict: A manager dictionary, to be used to open up a
database manager. Default is None.
post_interval: How long to wait between posts.
Default is None (post every record).
max_backlog: How many records are allowed to accumulate in the queue
before the queue is trimmed.
Default is sys.maxint (essentially, allow any number).
stale: How old a record can be and still considered useful.
Default is None (never becomes too old).
log_success: If True, log a successful post in the system log.
Default is True.
log_failure: If True, log an unsuccessful post in the system log.
Default is True.
timeout: How long to wait for the server to respond before giving up.
Default is 10 seconds.
max_tries: How many times to try the post before giving up.
Default is 3
retry_wait: How long to wait between retries when failures.
Default is 5 seconds.
retry_login: How long to wait before retrying a login. Default
is 3600 seconds (one hour).
softwaretype: Sent as field "softwaretype in the Ambient post.
Default is "weewx-x.y.z where x.y.z is the weewx version.
skip_upload: Do all record processing, but do not upload the result.
Useful for diagnostic purposes when local debugging should not
interfere with the downstream data service. Default is False.
"""
# Initialize my superclass:
threading.Thread.__init__(self, name=protocol_name)
self.setDaemon(True)
self.queue = queue
self.protocol_name = protocol_name
self.essentials = essentials
self.manager_dict = manager_dict
self.log_success = to_bool(log_success)
self.log_failure = to_bool(log_failure)
self.max_backlog = to_int(max_backlog)
self.max_tries = to_int(max_tries)
self.stale = to_int(stale)
self.post_interval = to_int(post_interval)
self.timeout = to_int(timeout)
self.retry_wait = to_int(retry_wait)
self.retry_login = to_int(retry_login)
self.softwaretype = softwaretype
self.lastpost = 0
self.skip_upload = to_bool(skip_upload)
def get_record(self, record, dbmanager):
"""Augment record data with additional data from the archive.
Should return results in the same units as the record and the database.
This is a general version that works for:
- WeatherUnderground
- PWSweather
- WOW
- CWOP
It can be overridden and specialized for additional protocols.
returns: A dictionary of weather values"""
# this will not work without a dbmanager
if dbmanager is None:
return record
_time_ts = record['dateTime']
_sod_ts = weeutil.weeutil.startOfDay(_time_ts)
# Make a copy of the record, then start adding to it:
_datadict = dict(record)
# If the type 'rain' does not appear in the archive schema,
# or the database is locked, an exception will be raised. Be prepared
# to catch it.
try:
if 'hourRain' not in _datadict:
# CWOP says rain should be "rain that fell in the past hour".
# WU says it should be "the accumulated rainfall in the past
# 60 min". Presumably, this is exclusive of the archive record
# 60 minutes before, so the SQL statement is exclusive on the
# left, inclusive on the right.
_result = dbmanager.getSql(
"SELECT SUM(rain), MIN(usUnits), MAX(usUnits) FROM %s "
"WHERE dateTime>? AND dateTime<=?" %
dbmanager.table_name, (_time_ts - 3600.0, _time_ts))
if _result is not None and _result[0] is not None:
if not _result[1] == _result[2] == record['usUnits']:
raise ValueError("Inconsistent units (%s vs %s vs %s) when querying for hourRain" %
(_result[1], _result[2], record['usUnits']))
_datadict['hourRain'] = _result[0]
else:
_datadict['hourRain'] = None
if 'rain24' not in _datadict:
# Similar issue, except for last 24 hours:
_result = dbmanager.getSql(
"SELECT SUM(rain), MIN(usUnits), MAX(usUnits) FROM %s "
"WHERE dateTime>? AND dateTime<=?" %
dbmanager.table_name, (_time_ts - 24 * 3600.0, _time_ts))
if _result is not None and _result[0] is not None:
if not _result[1] == _result[2] == record['usUnits']:
raise ValueError("Inconsistent units (%s vs %s vs %s) when querying for rain24" %
(_result[1], _result[2], record['usUnits']))
_datadict['rain24'] = _result[0]
else:
_datadict['rain24'] = None
if 'dayRain' not in _datadict:
# NB: The WU considers the archive with time stamp 00:00
# (midnight) as (wrongly) belonging to the current day
# (instead of the previous day). But, it's their site,
# so we'll do it their way. That means the SELECT statement
# is inclusive on both time ends:
_result = dbmanager.getSql(
"SELECT SUM(rain), MIN(usUnits), MAX(usUnits) FROM %s "
"WHERE dateTime>=? AND dateTime<=?" %
dbmanager.table_name, (_sod_ts, _time_ts))
if _result is not None and _result[0] is not None:
if not _result[1] == _result[2] == record['usUnits']:
raise ValueError("Inconsistent units (%s vs %s vs %s) when querying for dayRain" %
(_result[1], _result[2], record['usUnits']))
_datadict['dayRain'] = _result[0]
else:
_datadict['dayRain'] = None
except weedb.OperationalError as e:
syslog.syslog(syslog.LOG_DEBUG,
"restx: %s: Database OperationalError '%s'" %
(self.protocol_name, e))
return _datadict
def run(self):
"""If there is a database specified, open the database, then call
run_loop() with the database. If no database is specified, simply
call run_loop()."""
# Open up the archive. Use a 'with' statement. This will automatically
# close the archive in the case of an exception:
if self.manager_dict is not None:
with weewx.manager.open_manager(self.manager_dict) as _manager:
self.run_loop(_manager)
else:
self.run_loop()
def run_loop(self, dbmanager=None):
"""Runs a continuous loop, waiting for records to appear in the queue,
then processing them.
"""
while True:
while True:
# This will block until something appears in the queue:
_record = self.queue.get()
# A None record is our signal to exit:
if _record is None:
return
# If packets have backed up in the queue, trim it until it's
# no bigger than the max allowed backlog:
if self.queue.qsize() <= self.max_backlog:
break
if self.skip_this_post(_record['dateTime']):
continue
try:
# Process the record, using whatever method the specializing
# class provides
self.process_record(_record, dbmanager)
except AbortedPost as e:
if self.log_success:
_time_str = timestamp_to_string(_record['dateTime'])
syslog.syslog(syslog.LOG_INFO,
"restx: %s: Skipped record %s: %s" %
(self.protocol_name, _time_str, e))
except BadLogin:
syslog.syslog(syslog.LOG_ERR, "restx: %s: Bad login; "
"waiting %s minutes then retrying" %
(self.protocol_name, self.retry_login / 60.0))
time.sleep(self.retry_login)
except FailedPost as e:
if self.log_failure:
_time_str = timestamp_to_string(_record['dateTime'])
syslog.syslog(syslog.LOG_ERR,
"restx: %s: Failed to publish record %s: %s"
% (self.protocol_name, _time_str, e))
except Exception as e:
# Some unknown exception occurred. This is probably a serious
# problem. Exit.
syslog.syslog(syslog.LOG_CRIT,
"restx: %s: Unexpected exception of type %s" %
(self.protocol_name, type(e)))
weeutil.weeutil.log_traceback('*** ', syslog.LOG_DEBUG)
syslog.syslog(syslog.LOG_CRIT,
"restx: %s: Thread exiting. Reason: %s" %
(self.protocol_name, e))
return
else:
if self.log_success:
_time_str = timestamp_to_string(_record['dateTime'])
syslog.syslog(syslog.LOG_INFO,
"restx: %s: Published record %s" %
(self.protocol_name, _time_str))
def process_record(self, record, dbmanager):
"""Default version of process_record.
This version uses HTTP GETs to do the post, which should work for many
protocols, but it can always be replaced by a specializing class."""
# Get the full record by querying the database ...
_full_record = self.get_record(record, dbmanager)
# ... check it ...
self.check_this_record(_full_record)
# ... format the URL, using the relevant protocol ...
_url = self.format_url(_full_record)
# ... get the Request to go with it...
_request = self.get_request(_url)
# ... get any POST payload...
_payload = self.get_post_body(_full_record)
# ... add a proper Content-Type if needed...
if _payload:
_request.add_header('Content-Type', _payload[1])
data = _payload[0]
else:
data = None
# ... check to see if this is just a drill...
if self.skip_upload:
raise AbortedPost("Skip post")
# ... then, finally, post it
self.post_with_retries(_request, data)
def get_request(self, url):
"""Get a request object. This can be overridden to add any special headers."""
_request = urllib2.Request(url)
_request.add_header("User-Agent", "weewx/%s" % weewx.__version__)
return _request
def post_with_retries(self, request, data=None):
"""Post a request, retrying if necessary
Attempts to post the request object up to max_tries times.
Catches a set of generic exceptions.
request: An instance of urllib2.Request
data: The body of the POST. If not given, the request will be done as a GET.
"""
# Retry up to max_tries times:
for _count in range(self.max_tries):
try:
# Do a single post. The function post_request() can be
# specialized by a RESTful service to catch any unusual
# exceptions.
_response = self.post_request(request, data)
if 200 <= _response.code <= 299:
# No exception thrown and we got a good response code, but
# we're still not done. Some protocols encode a bad
# station ID or password in the return message.
# Give any interested protocols a chance to examine it.
# This must also be inside the try block because some
# implementations defer hitting the socket until the
# response is used.
self.check_response(_response)
# Does not seem to be an error. We're done.
return
# We got a bad response code. By default, log it and try again.
# Provide method for derived classes to behave otherwise if
# necessary.
self.handle_code(_response.code, _count + 1)
except (urllib2.URLError, socket.error, httplib.HTTPException) as e:
# An exception was thrown. By default, log it and try again.
# Provide method for derived classes to behave otherwise if
# necessary.
self.handle_exception(e, _count + 1)
time.sleep(self.retry_wait)
else:
# This is executed only if the loop terminates normally, meaning
# the upload failed max_tries times. Raise an exception. Caller
# can decide what to do with it.
raise FailedPost("Failed upload after %d tries" % (self.max_tries,))
def check_this_record(self, record):
"""Raises exception AbortedPost if the record should not be posted.
Otherwise, does nothing"""
for obs_type in self.essentials:
if self.essentials[obs_type] and record.get(obs_type) is None:
raise AbortedPost("Observation type %s missing" % obs_type)
def check_response(self, response):
"""Check the response from a HTTP post. This version does nothing."""
pass
def handle_code(self, code, count):
"""Check code from HTTP post. This simply logs the response."""
syslog.syslog(syslog.LOG_DEBUG,
"restx: %s: Failed upload attempt %d: Code %s" %
(self.protocol_name, count, code))
def handle_exception(self, e, count):
"""Check exception from HTTP post. This simply logs the exception."""
syslog.syslog(syslog.LOG_DEBUG,
"restx: %s: Failed upload attempt %d: %s" %
(self.protocol_name, count, e))
def post_request(self, request, data=None):
"""Post a request object. This version does not catch any HTTP
exceptions.
Specializing versions can can catch any unusual exceptions that might
get raised by their protocol.
request: An instance of urllib2.Request
data: If given, the request will be done as a POST. Otherwise,
as a GET. [optional]
"""
try:
# Python 2.5 and earlier do not have a "timeout" parameter.
# Including one could cause a TypeError exception. Be prepared
# to catch it.
_response = urllib2.urlopen(request, data=data, timeout=self.timeout)
except TypeError:
# Must be Python 2.5 or early. Use a simple, unadorned request
_response = urllib2.urlopen(request, data=data)
return _response
def skip_this_post(self, time_ts):
"""Check whether the post is current"""
# Don't post if this record is too old
if self.stale is not None:
_how_old = time.time() - time_ts
if _how_old > self.stale:
syslog.syslog(
syslog.LOG_DEBUG,
"restx: %s: record %s is stale (%d > %d)." %
(self.protocol_name, timestamp_to_string(time_ts),
_how_old, self.stale))
return True
if self.post_interval is not None:
# We don't want to post more often than the post interval
_how_long = time_ts - self.lastpost
if _how_long < self.post_interval:
syslog.syslog(
syslog.LOG_DEBUG,
"restx: %s: wait interval (%d < %d) has not passed for record %s" %
(self.protocol_name, _how_long, self.post_interval,
timestamp_to_string(time_ts)))
return True
self.lastpost = time_ts
return False
def get_post_body(self, record): # @UnusedVariable
"""Return any POST payload.
The returned value should be a 2-way tuple. First element is the Python
object to be included as the payload. Second element is the MIME type it
is in (such as "application/json").
Return a simple 'None' if there is no POST payload. This is the default.
"""
# Maintain backwards compatibility with the old format_data() function.
body = self.format_data(record)
if body:
return (body, 'application/x-www-form-urlencoded')
return None
def format_data(self, record): # @UnusedVariable
"""Return a POST payload as an urlencoded object.
DEPRECATED. Use get_post_body() instead.
"""
return None
# ==============================================================================
# Ambient protocols
# ==============================================================================
class StdWunderground(StdRESTful):
"""Specialized version of the Ambient protocol for the Weather Underground.
"""
# the rapidfire URL:
rf_url = "https://rtupdate.wunderground.com/weatherstation/updateweatherstation.php"
# the personal weather station URL:
pws_url = "https://weatherstation.wunderground.com/weatherstation/updateweatherstation.php"
def __init__(self, engine, config_dict):
super(StdWunderground, self).__init__(engine, config_dict)
_ambient_dict = get_site_dict(
config_dict, 'Wunderground', 'station', 'password')
if _ambient_dict is None:
return
_essentials_dict = search_up(config_dict['StdRESTful']['Wunderground'], 'Essentials', {})
syslog.syslog(syslog.LOG_DEBUG, "restx: WU essentials: %s" % _essentials_dict)
# Get the manager dictionary:
_manager_dict = weewx.manager.get_manager_dict_from_config(
config_dict, 'wx_binding')
# The default is to not do an archive post if a rapidfire post
# has been specified, but this can be overridden
do_rapidfire_post = to_bool(_ambient_dict.pop('rapidfire', False))
do_archive_post = to_bool(_ambient_dict.pop('archive_post',
not do_rapidfire_post))
if do_archive_post:
_ambient_dict.setdefault('server_url', StdWunderground.pws_url)
self.archive_queue = Queue.Queue()
self.archive_thread = AmbientThread(
self.archive_queue,
_manager_dict,
protocol_name="Wunderground-PWS",
essentials=_essentials_dict,
**_ambient_dict)
self.archive_thread.start()
self.bind(weewx.NEW_ARCHIVE_RECORD, self.new_archive_record)
syslog.syslog(syslog.LOG_INFO, "restx: Wunderground-PWS: "
"Data for station %s will be posted" %
_ambient_dict['station'])
if do_rapidfire_post:
_ambient_dict.setdefault('server_url', StdWunderground.rf_url)
_ambient_dict.setdefault('log_success', False)
_ambient_dict.setdefault('log_failure', False)
_ambient_dict.setdefault('max_backlog', 0)
_ambient_dict.setdefault('max_tries', 1)
_ambient_dict.setdefault('rtfreq', 2.5)
self.cached_values = CachedValues()
self.loop_queue = Queue.Queue()
self.loop_thread = AmbientLoopThread(
self.loop_queue,
_manager_dict,
protocol_name="Wunderground-RF",
essentials=_essentials_dict,
**_ambient_dict)
self.loop_thread.start()
self.bind(weewx.NEW_LOOP_PACKET, self.new_loop_packet)
syslog.syslog(syslog.LOG_INFO, "restx: Wunderground-RF: "
"Data for station %s will be posted" %
_ambient_dict['station'])
def new_loop_packet(self, event):
"""Puts new LOOP packets in the loop queue"""
if weewx.debug >= 3:
syslog.syslog(syslog.LOG_DEBUG, "restx: raw packet: %s" % to_sorted_string(event.packet))
self.cached_values.update(event.packet, event.packet['dateTime'])
if weewx.debug >= 3:
syslog.syslog(syslog.LOG_DEBUG, "restx: cached packet: %s" %
to_sorted_string(self.cached_values.get_packet(event.packet['dateTime'])))
self.loop_queue.put(
self.cached_values.get_packet(event.packet['dateTime']))
def new_archive_record(self, event):
"""Puts new archive records in the archive queue"""
self.archive_queue.put(event.record)
class StdCustomApi(StdRESTful):
"""Specialized version of the Ambient protocol for your custom API.
"""
def __init__(self, engine, config_dict):
super(StdCustomApi, self).__init__(engine, config_dict)
_ambient_dict = get_site_dict(
config_dict, 'CustomAPI', 'station', 'password')
if _ambient_dict is None:
return
_essentials_dict = search_up(config_dict['StdRESTful']['CustomAPI'], 'Essentials', {})
syslog.syslog(syslog.LOG_DEBUG, "restx: CustomAPI essentials: %s" % _essentials_dict)
# Get the manager dictionary:
_manager_dict = weewx.manager.get_manager_dict_from_config(
config_dict, 'wx_binding')
# The default is to not do an archive post if a rapidfire post
# has been specified, but this can be overridden
do_rapidfire_post = to_bool(_ambient_dict.pop('rapidfire', False))
do_archive_post = to_bool(_ambient_dict.pop('archive_post',
not do_rapidfire_post))
# Get the URL of the custom API
apiURLConfig = _ambient_dict.pop('apiURL')
if do_archive_post:
_ambient_dict.setdefault('server_url', apiURLConfig)
self.archive_queue = Queue.Queue()
self.archive_thread = AmbientThread(
self.archive_queue,
_manager_dict,
protocol_name="CustomAPI",
essentials=_essentials_dict,
**_ambient_dict)
self.archive_thread.start()
self.bind(weewx.NEW_ARCHIVE_RECORD, self.new_archive_record)
syslog.syslog(syslog.LOG_INFO, "restx: CustomAPI: "
"Data for station %s will be posted" %
_ambient_dict['station'])
if do_rapidfire_post:
_ambient_dict.setdefault('server_url', apiURLConfig)
_ambient_dict.setdefault('log_success', False)
_ambient_dict.setdefault('log_failure', False)
_ambient_dict.setdefault('max_backlog', 0)
_ambient_dict.setdefault('max_tries', 1)
_ambient_dict.setdefault('rtfreq', 2.5)
self.cached_values = CachedValues()
self.loop_queue = Queue.Queue()
self.loop_thread = AmbientLoopThread(
self.loop_queue,
_manager_dict,
protocol_name="CustomAPI",
essentials=_essentials_dict,
**_ambient_dict)
self.loop_thread.start()
self.bind(weewx.NEW_LOOP_PACKET, self.new_loop_packet)
syslog.syslog(syslog.LOG_INFO, "restx: CustomAPI: "
"Data for station %s will be posted" %
_ambient_dict['station'])
def new_loop_packet(self, event):
"""Puts new LOOP packets in the loop queue"""
if weewx.debug >= 3:
syslog.syslog(syslog.LOG_DEBUG, "restx: raw packet: %s" % to_sorted_string(event.packet))
self.cached_values.update(event.packet, event.packet['dateTime'])
if weewx.debug >= 3:
syslog.syslog(syslog.LOG_DEBUG, "restx: cached packet: %s" %
to_sorted_string(self.cached_values.get_packet(event.packet['dateTime'])))
self.loop_queue.put(
self.cached_values.get_packet(event.packet['dateTime']))
def new_archive_record(self, event):
"""Puts new archive records in the archive queue"""
self.archive_queue.put(event.record)
class CachedValues(object):
"""Dictionary of value-timestamp pairs. Each timestamp indicates when the
corresponding value was last updated."""
def __init__(self):
self.unit_system = None
self.values = dict()
def update(self, packet, ts):
# update the cache with values from the specified packet, using the
# specified timestamp.
for k in packet:
if k is None:
# well-formed packets do not have None as key, but just in case
continue
elif k == 'dateTime':
# do not cache the timestamp
continue
elif k == 'usUnits':
# assume unit system of first packet, then enforce consistency
if self.unit_system is None:
self.unit_system = packet['usUnits']
elif packet['usUnits'] != self.unit_system:
raise ValueError("Mixed units encountered in cache. %s vs %s" % \
(self.unit_system, packet['usUnits']))
else:
# cache each value, associating it with the it was cached
self.values[k] = {'value': packet[k], 'ts': ts}
def get_value(self, k, ts, stale_age):
# get the value for the specified key. if the value is older than
# stale_age (seconds) then return None.
if k in self.values and ts - self.values[k]['ts'] < stale_age:
return self.values[k]['value']
return None
def get_packet(self, ts=None, stale_age=960):
if ts is None:
ts = int(time.time() + 0.5)
pkt = {'dateTime': ts, 'usUnits': self.unit_system}
for k in self.values:
pkt[k] = self.get_value(k, ts, stale_age)
return pkt
class StdPWSWeather(StdRESTful):
"""Specialized version of the Ambient protocol for PWSWeather"""
# The URL used by PWSWeather:
archive_url = "http://www.pwsweather.com/pwsupdate/pwsupdate.php"
def __init__(self, engine, config_dict):
super(StdPWSWeather, self).__init__(engine, config_dict)
_ambient_dict = get_site_dict(
config_dict, 'PWSweather', 'station', 'password')
if _ambient_dict is None:
return
# Get the manager dictionary:
_manager_dict = weewx.manager.get_manager_dict_from_config(
config_dict, 'wx_binding')
_ambient_dict.setdefault('server_url', StdPWSWeather.archive_url)
self.archive_queue = Queue.Queue()
self.archive_thread = AmbientThread(self.archive_queue, _manager_dict,
protocol_name="PWSWeather",
**_ambient_dict)
self.archive_thread.start()
self.bind(weewx.NEW_ARCHIVE_RECORD, self.new_archive_record)
syslog.syslog(syslog.LOG_INFO, "restx: PWSWeather: "
"Data for station %s will be posted" %
_ambient_dict['station'])
def new_archive_record(self, event):
self.archive_queue.put(event.record)
# For backwards compatibility with early alpha versions:
StdPWSweather = StdPWSWeather
class StdWOW(StdRESTful):
"""Upload using the UK Met Office's WOW protocol.
For details of the WOW upload protocol, see
http://wow.metoffice.gov.uk/support/dataformats#dataFileUpload
"""
# The URL used by WOW:
archive_url = "http://wow.metoffice.gov.uk/automaticreading"
def __init__(self, engine, config_dict):
super(StdWOW, self).__init__(engine, config_dict)
_ambient_dict = get_site_dict(
config_dict, 'WOW', 'station', 'password')
if _ambient_dict is None:
return
# Get the manager dictionary:
_manager_dict = weewx.manager.get_manager_dict_from_config(
config_dict, 'wx_binding')
_ambient_dict.setdefault('server_url', StdWOW.archive_url)
_ambient_dict.setdefault('post_interval', 900)
self.archive_queue = Queue.Queue()
self.archive_thread = WOWThread(self.archive_queue, _manager_dict,
protocol_name="WOW",
**_ambient_dict)
self.archive_thread.start()
self.bind(weewx.NEW_ARCHIVE_RECORD, self.new_archive_record)
syslog.syslog(syslog.LOG_INFO, "restx: WOW: "
"Data for station %s will be posted" %
_ambient_dict['station'])
def new_archive_record(self, event):
self.archive_queue.put(event.record)
class AmbientThread(RESTThread):
"""Concrete class for threads posting from the archive queue,
using the Ambient PWS protocol."""
def __init__(self,
queue,
manager_dict,
station, password, server_url,
post_indoor_observations=False,
protocol_name="Unknown-Ambient",
essentials={},
post_interval=None, max_backlog=sys.maxint, stale=None,
log_success=True, log_failure=True,
timeout=10, max_tries=3, retry_wait=5, retry_login=3600,
softwaretype="weewx-%s" % weewx.__version__,
skip_upload=False):
"""
Initializer for the AmbientThread class.
Parameters specific to this class:
station: The name of the station. For example, for the WU, this
would be something like "KORHOODR3".
password: Password used for the station.
server_url: An url where the server for this protocol can be found.
"""
super(AmbientThread, self).__init__(queue,
protocol_name=protocol_name,
essentials=essentials,
manager_dict=manager_dict,
post_interval=post_interval,
max_backlog=max_backlog,
stale=stale,
log_success=log_success,
log_failure=log_failure,
timeout=timeout,
max_tries=max_tries,
retry_wait=retry_wait,
retry_login=retry_login,
softwaretype=softwaretype,
skip_upload=skip_upload)
self.station = station
self.password = password
self.server_url = server_url
self.formats = dict(AmbientThread._FORMATS)
if to_bool(post_indoor_observations):
self.formats.update(AmbientThread._INDOOR_FORMATS)
# Types and formats of the data to be published:
_FORMATS = {'dateTime' : 'dateutc=%s',
'barometer' : 'baromin=%.3f',
'outTemp' : 'tempf=%.1f',
'outHumidity': 'humidity=%03.0f',
'windSpeed' : 'windspeedmph=%03.1f',
'windDir' : 'winddir=%03.0f',
'windGust' : 'windgustmph=%03.1f',
'dewpoint' : 'dewptf=%.1f',
'hourRain' : 'rainin=%.2f',
'dayRain' : 'dailyrainin=%.2f',
'radiation' : 'solarradiation=%.2f',
'UV' : 'UV=%.2f',
# The following four formats have been commented out until the WU
# fixes the bug that causes them to be displayed as soil moisture.
# 'extraTemp1' : "temp2f=%.1f",
# 'extraTemp2' : "temp3f=%.1f",
# 'extraTemp3' : "temp4f=%.1f",
# 'extraTemp4' : "temp5f=%.1f",
'soilTemp1' : "soiltempf=%.1f",
'soilTemp2' : "soiltemp2f=%.1f",
'soilTemp3' : "soiltemp3f=%.1f",
'soilTemp4' : "soiltemp4f=%.1f",
'soilMoist1' : "soilmoisture=%03.0f",
'soilMoist2' : "soilmoisture2=%03.0f",
'soilMoist3' : "soilmoisture3=%03.0f",
'soilMoist4' : "soilmoisture4=%03.0f",
'leafWet1' : "leafwetness=%03.0f",
'leafWet2' : "leafwetness2=%03.0f",
'realtime' : 'realtime=%d',
'rtfreq' : 'rtfreq=%.1f'}
_INDOOR_FORMATS = {
'inTemp' : 'indoortempf=%.1f',
'inHumidity': 'indoorhumidity=%.0f'}
def format_url(self, incoming_record):
"""Return an URL for posting using the Ambient protocol."""
record = weewx.units.to_US(incoming_record)
_liststr = ["action=updateraw",
"ID=%s" % self.station,
"PASSWORD=%s" % urllib.quote(self.password),
"softwaretype=%s" % self.softwaretype]
# Go through each of the supported types, formatting it, then adding
# to _liststr:
for _key in self.formats:
_v = record.get(_key)
# Check to make sure the type is not null
if _v is not None:
if _key == 'dateTime':
# For dates, convert from time stamp to a string, using
# what the Weather Underground calls "MySQL format." I've
# fiddled with formatting, and it seems that escaping the
# colons helps its reliability. But, I could be imagining
# things.
_v = urllib.quote(str(datetime.datetime.utcfromtimestamp(_v)))
# Format the value, and accumulate in _liststr:
_liststr.append(self.formats[_key] % _v)
# Now stick all the pieces together with an ampersand between them:
_urlquery = '&'.join(_liststr)
# This will be the complete URL for the HTTP GET:
_url = "%s?%s" % (self.server_url, _urlquery)
# show the url in the logs for debug, but mask any password
if weewx.debug >= 2:
syslog.syslog(syslog.LOG_DEBUG, "restx: Ambient: url: %s" %
re.sub(r"PASSWORD=[^\&]*", "PASSWORD=XXX", _url))
return _url
def check_response(self, response):
"""Check the HTTP response code for an Ambient related error."""
for line in response:
# PWSweather signals with 'ERROR', WU with 'INVALID':
if line.startswith('ERROR') or line.startswith('INVALID'):
# Bad login. No reason to retry. Raise an exception.
raise BadLogin(line)
class AmbientLoopThread(AmbientThread):
"""Version used for the Rapidfire protocol."""
def __init__(self, queue, manager_dict,
station, password, server_url,
protocol_name="Unknown-Ambient",
essentials={},
post_interval=None, max_backlog=sys.maxint, stale=None,
log_success=True, log_failure=True,
timeout=10, max_tries=3, retry_wait=5, rtfreq=2.5):
"""
Initializer for the AmbientLoopThread class.
Parameters specific to this class:
rtfreq: Frequency of update in seconds for RapidFire
"""
super(AmbientLoopThread, self).__init__(queue,