Skip to content

Commit

Permalink
Merge pull request #4 from ICRAR/python3
Browse files Browse the repository at this point in the history
Python3
  • Loading branch information
NickSwainston authored Jan 15, 2020
2 parents cc42be8 + 363e569 commit 687d7a1
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 52 deletions.
22 changes: 11 additions & 11 deletions mwa_pulsar_client/mwa_pulsar_client/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf8 -*-
import os
import urllib
import urllib.parse
import requests


Expand All @@ -16,7 +16,7 @@ def detection_find_calibrator(addr, auth, **kwargs):
path = '{0}/{1}/'.format(addr, 'detection_find_calibrator')
r = requests.get(url=path,
auth=auth,
params=urllib.urlencode(kwargs))
params=urllib.parse.urlencode(kwargs))
r.raise_for_status()
return r.json()

Expand All @@ -32,7 +32,7 @@ def calibration_file_by_observation_id(addr, auth, **kwargs):
path = '{0}/{1}/'.format(addr, 'calibration_file_by_observation_id')
r = requests.get(url=path,
auth=auth,
params=urllib.urlencode(kwargs))
params=urllib.parse.urlencode(kwargs))
r.raise_for_status()
return r.json()

Expand Down Expand Up @@ -62,7 +62,7 @@ def calibrator_get(addr, auth, **kwargs):
path = '{0}/{1}/'.format(addr, 'calibrator_get')
r = requests.get(url=path,
auth=auth,
params=urllib.urlencode(kwargs))
params=urllib.parse.urlencode(kwargs))
r.raise_for_status()
return r.json()

Expand Down Expand Up @@ -107,7 +107,7 @@ def pulsar_get(addr, auth, **kwargs):
path = '{0}/{1}/'.format(addr, 'pulsar_get')
r = requests.get(url=path,
auth=auth,
params=urllib.urlencode(kwargs))
params=urllib.parse.urlencode(kwargs))
r.raise_for_status()
return r.json()

Expand Down Expand Up @@ -154,7 +154,7 @@ def detection_get(addr, auth, **kwargs):
path = '{0}/{1}/'.format(addr, 'detection_get')
r = requests.get(url=path,
auth=auth,
params=urllib.urlencode(kwargs))
params=urllib.parse.urlencode(kwargs))
r.raise_for_status()
return r.json()

Expand Down Expand Up @@ -246,7 +246,7 @@ def detection_file_upload(addr, auth, **kwargs):
raise Exception('filepath not found')
files = {'path': open(filepath, 'rb')}
new_kwargs = {}
for k, v in kwargs.iteritems():
for k, v in kwargs.items():
new_kwargs[k] = str(v)
r = requests.post(url=path, auth=auth, files=files, headers=new_kwargs)
r.raise_for_status()
Expand All @@ -270,7 +270,7 @@ def detection_file_download(addr, auth, filename, outputpath):
params = {'filename': filename}
r = requests.get(url=path,
auth=auth,
params=urllib.urlencode(params),
params=urllib.parse.urlencode(params),
stream=True)
r.raise_for_status()

Expand Down Expand Up @@ -305,7 +305,7 @@ def calibrator_file_upload(addr, auth, **kwargs):
raise Exception('filepath not found')
files = {'path': open(filepath, 'rb')}
new_kwargs = {}
for k, v in kwargs.iteritems():
for k, v in kwargs.items():
new_kwargs[k] = str(v)
r = requests.post(url=path, auth=auth, files=files, headers=new_kwargs)
r.raise_for_status()
Expand All @@ -330,7 +330,7 @@ def calibrator_file_download(addr, auth, filename, outputpath):
params = {'filename': filename}
r = requests.get(url=path,
auth=auth,
params=urllib.urlencode(params),
params=urllib.parse.urlencode(params),
stream=True)
r.raise_for_status()

Expand Down Expand Up @@ -362,6 +362,6 @@ def psrcat(addr, auth, pulsar):
payload = {'name': pulsar, 'format': 'json'}
r = requests.get(url=path,
auth=auth,
params=urllib.urlencode(payload))
params=urllib.parse.urlencode(payload))
r.raise_for_status()
return r.json()
84 changes: 43 additions & 41 deletions scripts/voltdownload.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
#
# ICRAR - International Centre for Radio Astronomy Research
# (c) UWA - The University of Western Australia, 2014
Expand All @@ -25,15 +25,23 @@
import time
import json
import threading
import urllib2
import urllib
import urllib.request
import base64
import time
import datetime
import calendar
from optparse import OptionParser
from multiprocessing import Queue
from Queue import Empty
from queue import Empty, Queue
import logging

# set up the logger for stand-alone execution
logging.basicConfig(format='%(asctime)s l %(lineno)-4d [%(levelname)s] :: %(message)s',
datefmt='%a %b %d %H:%M:%S', level=logging.INFO)
logger = logging.getLogger(__name__)
#ch = logging.StreamHandler()
#formatter = logging.Formatter('%(asctime)s %(name)s %(lineno)-4d %(levelname)-7s :: %(message)s')
#ch.setFormatter(formatter)
#logger.addHandler(ch)

username = 'ngas'
password = 'ngas'
Expand All @@ -56,18 +64,18 @@ def file_error(err):
global ERRORS
with LOCK:
ERRORS.append(err)
print(err)
logging.error(err)

def file_starting(filename):
with LOCK:
print('%s [INFO] Downloading %s' % (time.strftime('%c'), filename))
logging.info('Downloading %s' % (filename))

def file_complete(filename):
global COMPLETE
global TOTAL_FILES
with LOCK:
COMPLETE = COMPLETE + 1
print('%s [INFO] %s complete [%d of %d]' % (time.strftime('%c'), filename,
logging.info('%s complete [%d of %d]' % (filename,
COMPLETE, TOTAL_FILES))

def split_raw_recombined(filename):
Expand Down Expand Up @@ -156,11 +164,8 @@ def query_observation(obs, host, filetype, timefrom, duration):
processRange = True

response = None
try:
url = 'http://%s/metadata/obs/?obs_id=%s&nocache' % (host, str(obs))
request = urllib2.Request(url)
response = urllib2.urlopen(request)

url = 'http://%s/metadata/obs/?obs_id=%s&nocache' % (host, str(obs))
with urllib.request.urlopen(url) as response:
resultbuffer = []
while True:
result = response.read(32768)
Expand All @@ -169,10 +174,10 @@ def query_observation(obs, host, filetype, timefrom, duration):
resultbuffer.append(result)

keymap = {}
files = json.loads(''.join(resultbuffer))['files']
files = json.loads(b''.join(resultbuffer).decode('utf-8'))['files']
if processRange:
second = None
for f, v in files.iteritems():
for f, v in files.items():
ft = v['filetype']
size = v['size']
add = False
Expand All @@ -197,7 +202,7 @@ def query_observation(obs, host, filetype, timefrom, duration):
keymap[f] = size

else:
for f, v in files.iteritems():
for f, v in files.items():
ft = v['filetype']
size = v['size']
if filetype == 11 and ft == 11:
Expand All @@ -211,9 +216,6 @@ def query_observation(obs, host, filetype, timefrom, duration):

return keymap

finally:
if response:
response.close()


def check_complete(filename, size, dir):
Expand Down Expand Up @@ -243,12 +245,12 @@ def download_worker(url, filename, size, out, bufsize, prestage):
try:
file_starting(filename)

request = urllib2.Request(url)
base64string = base64.encodestring('%s:%s' % (username, password)).replace('\n', '')
request = urllib.request.Request(url)
base64string = base64.encodestring(('%s:%s' % (username,password)).encode()).decode().replace('\n', '')
request.add_header('Authorization', 'Basic %s' % base64string)
request.add_header('prestagefilelist', prestage)

u = urllib2.urlopen(request)
u = urllib.request.urlopen(request)
u.fp.bufsize = bufsize

file_size = int(u.headers['Content-Length'])
Expand All @@ -268,17 +270,17 @@ def download_worker(url, filename, size, out, bufsize, prestage):

file_complete(filename)

except urllib2.HTTPError as e:
file_error('%s [ERROR] %s %s' % (time.strftime('%c'), filename, str(e.read()) ))
except urllib.error.HTTPError as e:
file_error('%s %s' % (filename, str(e.read()) ))

except urllib2.URLError as urlerror:
except urllib.error.URLError as urlerror:
if hasattr(urlerror, 'reason'):
file_error('%s [ERROR] %s %s' % (time.strftime('%c'), filename, str(urlerror.reason) ))
file_error('%s %s' % (filename, str(urlerror.reason) ))
else:
file_error('%s [ERROR] %s %s' % (time.strftime('%c'), filename, str(urlerror) ))
file_error('%s %s' % (filename, str(urlerror) ))

except Exception as exp:
file_error('%s [ERROR] %s %s' % (time.strftime('%c'), filename, str(exp) ))
file_error('%s %s' % (filename, str(exp) ))

finally:
if u:
Expand Down Expand Up @@ -335,25 +337,24 @@ def main():
print('Number of simultaneous downloads must be > 0 and <= 12')
sys.exit(-1)

print('%s [INFO] Finding observation %s' % (time.strftime('%c'), options.obs))
logger.info('Finding observation %s' % options.obs)

fileresult = query_observation(options.obs, 'ws.mwatelescope.org',
options.filetype, options.timefrom, options.duration)
if len(fileresult) <= 0:
print('%s [INFO] No files found for observation %s and file type %s' % (time.strftime('%c'),
options.obs,
int(options.filetype)))
logger.info('No files found for observation %s and file type %s' % options.obs,
int(options.filetype))
sys.exit(1)

print('%s [INFO] Found %s files' % (time.strftime('%c'), str(len(fileresult))))
logger.info('Found %s files' % (str(len(fileresult))))

if len(fileresult) > 12000:
print('%s [INFO] File limit exceeded 12000, please stagger your download' % (time.strftime('%c')))
logger.error('File limit exceeded 12000, please stagger your download')
sys.exit(1)

# advise that we want to prestage all the files
filelist = []
for key, value in fileresult.iteritems():
for key, value in fileresult.items():
filelist.append(key)

prestage_files = json.dumps(filelist)
Expand All @@ -375,12 +376,13 @@ def main():
for filename, filesize in sorted(fileresult.items()):
url = 'http://%s/RETRIEVE?file_id=%s' % (options.ngashost, filename)
if not check_complete(filename, int(filesize), dir):
download_queue.put((url, filename, filesize, dir, bufsize, prestage_files))
download_queue.put((url, filename, filesize, dir,
bufsize, prestage_files))
continue
file_complete(filename)

threads = []
for t in xrange(numdownload):
for t in range(numdownload):
t = threading.Thread(target = download_queue_thread, args = (download_queue,))
t.setDaemon(True)
threads.append(t)
Expand All @@ -390,15 +392,15 @@ def main():
while t.isAlive():
t.join(timeout = 0.25)

print('%s [INFO] File Transfer Complete.' % (time.strftime('%c')))
logger.info('File Transfer Complete.')

if ERRORS:
print('%s [INFO] File Transfer Error Summary:' % (time.strftime('%c')))
logger.error('File Transfer Error Summary:')
for i in ERRORS:
print(i)
logger.error(i)
raise Exception()
else:
print('%s [INFO] File Transfer Success.' % (time.strftime('%c')))
logger.info('File Transfer Success.')


if __name__ == '__main__':
Expand Down

0 comments on commit 687d7a1

Please sign in to comment.