From c29216d3a1d39b6faebe14845503ad2131eb6d4a Mon Sep 17 00:00:00 2001 From: Ralph Kube Date: Mon, 2 Mar 2020 13:21:02 -0800 Subject: [PATCH] Moved cython kernels to external c-functions --- analysis/kernels_spectral_cy.pyx | 73 +++++++++++++++++++++++++++++--- analysis/lib/Makefile | 12 ++++++ analysis/lib/kernels.h | 8 ++++ analysis/setup.py | 15 ++++--- backends/backend_mongodb.py | 19 +-------- 5 files changed, 97 insertions(+), 30 deletions(-) create mode 100644 analysis/lib/Makefile create mode 100644 analysis/lib/kernels.h diff --git a/analysis/kernels_spectral_cy.pyx b/analysis/kernels_spectral_cy.pyx index 4f95d8f..4f6870c 100644 --- a/analysis/kernels_spectral_cy.pyx +++ b/analysis/kernels_spectral_cy.pyx @@ -10,10 +10,34 @@ from cython.parallel import prange DTYPE = np.complex128 ctypedef cnp.complex128_t DTYPE_t - from libc.complex cimport conj, csqrt, cabs, creal, cimag from libc.math cimport atan2 +cdef extern from "kernels.h": + void kernel_coherence(double complex* fft_data, + double* result, + size_t* ch1_idx_arr, + size_t* ch2_idx_arr, + size_t num_idx, + size_t num_fft, + size_t num_bins) + + void kernel_crossphase(double complex* fft_data, + double* result, + size_t* ch1_idx_arr, + size_t* ch2_idx_arr, + size_t num_idx, + size_t num_fft, + size_t num_bins) + + void kernel_crosspower(double complex* fft_data, + double* result, + size_t* ch1_idx_arr, + size_t* ch2_idx_arr, + size_t num_idx, + size_t num_fft, + size_t num_bins) + @cython.boundscheck(False) @cython.wraparound(False) @@ -33,8 +57,8 @@ def kernel_coherence_cy(cnp.ndarray[cnp.complex128_t, ndim=3] data, ch_it, fft_c cdef cnp.ndarray[cnp.uint64_t, ndim=1] ch1_idx_arr = np.array([int(ch_pair.ch1.idx()) for ch_pair in ch_it], dtype=np.uint64) cdef cnp.ndarray[cnp.uint64_t, ndim=1] ch2_idx_arr = np.array([int(ch_pair.ch2.idx()) for ch_pair in ch_it], dtype=np.uint64) cdef cnp.ndarray[cnp.float64_t, ndim=2] result = np.zeros([num_idx, num_fft], dtype=np.float64) - - with nogil: + + with nogil: for idx in prange(num_idx, schedule=static): ch1_idx = ch1_idx_arr[idx] ch2_idx = ch2_idx_arr[idx] @@ -46,13 +70,24 @@ def kernel_coherence_cy(cnp.ndarray[cnp.complex128_t, ndim=3] data, ch_it, fft_c Pyy = data[ch2_idx, nn, bb] * conj(data[ch2_idx, nn, bb]) _tmp = _tmp + data[ch1_idx, nn, bb] * conj(data[ch2_idx, nn, bb]) / csqrt(Pxx * Pyy) - #_tmp /= num_bins - result[idx, nn] = creal(cabs(_tmp)) / num_bins - return(result) + return(result) + +def kernel_coherence_v2(cnp.ndarray[cnp.complex128_t, ndim=3] data, ch_it, fft_config): + cdef size_t num_idx = len(ch_it) # Length of index array + cdef size_t num_fft = data.shape[1] # Number of fft frequencies + cdef size_t num_bins = data.shape[2] # Number of ffts + cdef cnp.ndarray[cnp.uintp_t, ndim=1] ch1_idx_arr = np.array([np.uintp(ch_pair.ch1.idx()) for ch_pair in ch_it], dtype=np.uint64) + cdef cnp.ndarray[cnp.uintp_t, ndim=1] ch2_idx_arr = np.array([np.uintp(ch_pair.ch2.idx()) for ch_pair in ch_it], dtype=np.uint64) + cdef cnp.ndarray[cnp.float64_t, ndim=2] result = np.zeros([num_idx, num_fft], dtype=np.float64) + kernel_coherence(&data[0, 0, 0], &result[0, 0], &ch1_idx_arr[0], &ch2_idx_arr[0], num_idx, num_fft, num_bins) + + return(result) + + @cython.boundscheck(False) @cython.wraparound(False) @@ -87,6 +122,19 @@ def kernel_crossphase_cy(cnp.ndarray[cnp.complex128_t, ndim=3] data, ch_it, fft_ return(result) + +def kernel_crossphase_v2(cnp.ndarray[cnp.complex128_t, ndim=3] data, ch_it, fft_config): + cdef size_t num_idx = len(ch_it) # Length of index array + cdef size_t num_fft = data.shape[1] # Number of fft frequencies + cdef size_t num_bins = data.shape[2] # Number of ffts + cdef cnp.ndarray[cnp.uintp_t, ndim=1] ch1_idx_arr = np.array([np.uintp(ch_pair.ch1.idx()) for ch_pair in ch_it], dtype=np.uint64) + cdef cnp.ndarray[cnp.uintp_t, ndim=1] ch2_idx_arr = np.array([np.uintp(ch_pair.ch2.idx()) for ch_pair in ch_it], dtype=np.uint64) + cdef cnp.ndarray[cnp.float64_t, ndim=2] result = np.zeros([num_idx, num_fft], dtype=np.float64) + + kernel_crossphase(&data[0, 0, 0], &result[0, 0], &ch1_idx_arr[0], &ch2_idx_arr[0], num_idx, num_fft, num_bins) + + return(result) + @cython.boundscheck(False) @cython.wraparound(False) @@ -119,4 +167,17 @@ def kernel_crosspower_cy(cnp.ndarray[cnp.complex128_t, ndim=3] data, ch_it, fft_ return(result) + +def kernel_crosspower_v2(cnp.ndarray[cnp.complex128_t, ndim=3] data, ch_it, fft_config): + cdef size_t num_idx = len(ch_it) # Length of index array + cdef size_t num_fft = data.shape[1] # Number of fft frequencies + cdef size_t num_bins = data.shape[2] # Number of ffts + cdef cnp.ndarray[cnp.uintp_t, ndim=1] ch1_idx_arr = np.array([np.uintp(ch_pair.ch1.idx()) for ch_pair in ch_it], dtype=np.uint64) + cdef cnp.ndarray[cnp.uintp_t, ndim=1] ch2_idx_arr = np.array([np.uintp(ch_pair.ch2.idx()) for ch_pair in ch_it], dtype=np.uint64) + cdef cnp.ndarray[cnp.float64_t, ndim=2] result = np.zeros([num_idx, num_fft], dtype=np.float64) + + kernel_crosspower(&data[0, 0, 0], &result[0, 0], &ch1_idx_arr[0], &ch2_idx_arr[0], num_idx, num_fft, num_bins) + + return(result) + # End of file kernels_spectral_py.pyx \ No newline at end of file diff --git a/analysis/lib/Makefile b/analysis/lib/Makefile new file mode 100644 index 0000000..af864f8 --- /dev/null +++ b/analysis/lib/Makefile @@ -0,0 +1,12 @@ +CC = icc + +default: libkernels.a + +libkernels.a: kernels.o + ar rcs $@ $^ + +kernels.o: kernels.c kernels.h + $(CC) -fPIC -qopenmp -O3 -c $< + +clean: + rm *.o *.a diff --git a/analysis/lib/kernels.h b/analysis/lib/kernels.h new file mode 100644 index 0000000..32132f0 --- /dev/null +++ b/analysis/lib/kernels.h @@ -0,0 +1,8 @@ +#include + + +void kernel_coherence(double complex*, double*, size_t*, size_t*, size_t, size_t, size_t); + +void kernel_crossphase(double complex*, double*, size_t*, size_t*, size_t, size_t, size_t); + +void kernel_crosspower(double complex*, double*, size_t*, size_t*, size_t, size_t, size_t); \ No newline at end of file diff --git a/analysis/setup.py b/analysis/setup.py index 2f2c9ef..51da02e 100644 --- a/analysis/setup.py +++ b/analysis/setup.py @@ -1,4 +1,3 @@ - # CC=icc LDSHARED="icc -shared" python setup.py build_ext --inplace @@ -8,10 +7,14 @@ import numpy extensions = [ - Extension("kernels_spectral_cy", ["kernels_spectral_cy.pyx"], - include_dirs=[numpy.get_include()], - extra_compile_args=['-qopenmp'], - extra_link_args=['-qopenmp']), + Extension("kernels_spectral_cy", + sources=["kernels_spectral_cy.pyx"], + libraries=["kernels"], + library_dirs=["lib"], + include_dirs=[numpy.get_include(), "lib"], + extra_compile_args=['-qopenmp'], + extra_link_args=['-qopenmp']), ] -setup(name="kernels_spectral_cy", ext_modules=cythonize(extensions, compiler_directives={'language_level' : "3"})) \ No newline at end of file +setup(name="kernels_spectral_cy", + ext_modules=cythonize(extensions, compiler_directives={'language_level' : "3"})) \ No newline at end of file diff --git a/backends/backend_mongodb.py b/backends/backend_mongodb.py index 32f249a..ca09384 100644 --- a/backends/backend_mongodb.py +++ b/backends/backend_mongodb.py @@ -25,21 +25,18 @@ def __init__(self, cfg_mongo): # Connect to mongodb #logger = logging.getLogger("DB") - #print("mongodb_backend: Initializing mongodb backend") self.client = pymongo.MongoClient("mongodb://mongodb07.nersc.gov/delta-fusion", username = cfg_mongo["storage"]["username"], password = cfg_mongo["storage"]["password"]) - #print("mongodb_backend: Connection established") db = self.client.get_database() - #print("mongodb_backend: db = ", db) try: self.collection = db.get_collection("test_analysis_" + cfg_mongo['run_id']) # Initialize gridfs self.fs = gridfs.GridFS(db) - #print("mongodb_backend: collection: ", self.collection) except: print("Could not get a collection") + def store_metadata(self, cfg, dispatch_seq): """Stores the metadata to the database @@ -53,11 +50,6 @@ def store_metadata(self, cfg, dispatch_seq): logger = logging.getLogger("DB") logger.debug("backend_mongodb: entering store_metadata") - #db = self.client.get_database() - #collection = db.get_collection("test_analysis_" + cfg['run_id']) - #logger.info(f"backend_mongodb Connected to database {db.name}") - #logger.info(f"backend_mongodb Using collection {collection.name}") - j_str = serialize_dispatch_seq(dispatch_seq) # Put the channel serialization in the corresponding key j_str = '{"channel_serialization": ' + j_str + '}' @@ -109,7 +101,6 @@ def store_task(self, task, future=None, dummy=True): else: storage_scheme["results"] = Binary(pickle.dumps(result)) self.collection.insert_one(storage_scheme) - print("***mongodb_backend*** Storing...") return None @@ -123,11 +114,7 @@ def store_data(self, data, info_dict): info_dict: Dictionary with metadata to store cfg: delta configuration object """ - import sys - #logger = logging.get("DB") - #print(f"MongoDB: Storing data") - #info_dict['result'] = Binary(pickle.dumps(data)) # Create a binary object and store it in gridfs fid = self.fs.put(Binary(pickle.dumps(data))) @@ -135,16 +122,12 @@ def store_data(self, data, info_dict): info_dict.update({"timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")}) info_dict.update({"description": "analysis results"}) - - #print("Inserting:") - #inserted_id = self.connection.insert_one(info_dict) try: inserted_id = self.collection.insert_one(info_dict) except: print("Unexpected error:", sys.exc_info()[0]) raise - #print(f"Wrote to MongoDB backend; id = {inserted_id}") def store_one(self, item):