Skip to content

Commit

Permalink
Moved cython kernels to external c-functions
Browse files Browse the repository at this point in the history
  • Loading branch information
rkube committed Mar 2, 2020
1 parent b8eb60b commit c29216d
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 30 deletions.
73 changes: 67 additions & 6 deletions analysis/kernels_spectral_cy.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,34 @@ from cython.parallel import prange
DTYPE = np.complex128
ctypedef cnp.complex128_t DTYPE_t


from libc.complex cimport conj, csqrt, cabs, creal, cimag
from libc.math cimport atan2

cdef extern from "kernels.h":
void kernel_coherence(double complex* fft_data,
double* result,
size_t* ch1_idx_arr,
size_t* ch2_idx_arr,
size_t num_idx,
size_t num_fft,
size_t num_bins)

void kernel_crossphase(double complex* fft_data,
double* result,
size_t* ch1_idx_arr,
size_t* ch2_idx_arr,
size_t num_idx,
size_t num_fft,
size_t num_bins)

void kernel_crosspower(double complex* fft_data,
double* result,
size_t* ch1_idx_arr,
size_t* ch2_idx_arr,
size_t num_idx,
size_t num_fft,
size_t num_bins)


@cython.boundscheck(False)
@cython.wraparound(False)
Expand All @@ -33,8 +57,8 @@ def kernel_coherence_cy(cnp.ndarray[cnp.complex128_t, ndim=3] data, ch_it, fft_c
cdef cnp.ndarray[cnp.uint64_t, ndim=1] ch1_idx_arr = np.array([int(ch_pair.ch1.idx()) for ch_pair in ch_it], dtype=np.uint64)
cdef cnp.ndarray[cnp.uint64_t, ndim=1] ch2_idx_arr = np.array([int(ch_pair.ch2.idx()) for ch_pair in ch_it], dtype=np.uint64)
cdef cnp.ndarray[cnp.float64_t, ndim=2] result = np.zeros([num_idx, num_fft], dtype=np.float64)

with nogil:
with nogil:
for idx in prange(num_idx, schedule=static):
ch1_idx = ch1_idx_arr[idx]
ch2_idx = ch2_idx_arr[idx]
Expand All @@ -46,13 +70,24 @@ def kernel_coherence_cy(cnp.ndarray[cnp.complex128_t, ndim=3] data, ch_it, fft_c
Pyy = data[ch2_idx, nn, bb] * conj(data[ch2_idx, nn, bb])
_tmp = _tmp + data[ch1_idx, nn, bb] * conj(data[ch2_idx, nn, bb]) / csqrt(Pxx * Pyy)

#_tmp /= num_bins

result[idx, nn] = creal(cabs(_tmp)) / num_bins

return(result)
return(result)


def kernel_coherence_v2(cnp.ndarray[cnp.complex128_t, ndim=3] data, ch_it, fft_config):
cdef size_t num_idx = len(ch_it) # Length of index array
cdef size_t num_fft = data.shape[1] # Number of fft frequencies
cdef size_t num_bins = data.shape[2] # Number of ffts
cdef cnp.ndarray[cnp.uintp_t, ndim=1] ch1_idx_arr = np.array([np.uintp(ch_pair.ch1.idx()) for ch_pair in ch_it], dtype=np.uint64)
cdef cnp.ndarray[cnp.uintp_t, ndim=1] ch2_idx_arr = np.array([np.uintp(ch_pair.ch2.idx()) for ch_pair in ch_it], dtype=np.uint64)
cdef cnp.ndarray[cnp.float64_t, ndim=2] result = np.zeros([num_idx, num_fft], dtype=np.float64)

kernel_coherence(&data[0, 0, 0], &result[0, 0], &ch1_idx_arr[0], &ch2_idx_arr[0], num_idx, num_fft, num_bins)

return(result)



@cython.boundscheck(False)
@cython.wraparound(False)
Expand Down Expand Up @@ -87,6 +122,19 @@ def kernel_crossphase_cy(cnp.ndarray[cnp.complex128_t, ndim=3] data, ch_it, fft_

return(result)


def kernel_crossphase_v2(cnp.ndarray[cnp.complex128_t, ndim=3] data, ch_it, fft_config):
cdef size_t num_idx = len(ch_it) # Length of index array
cdef size_t num_fft = data.shape[1] # Number of fft frequencies
cdef size_t num_bins = data.shape[2] # Number of ffts
cdef cnp.ndarray[cnp.uintp_t, ndim=1] ch1_idx_arr = np.array([np.uintp(ch_pair.ch1.idx()) for ch_pair in ch_it], dtype=np.uint64)
cdef cnp.ndarray[cnp.uintp_t, ndim=1] ch2_idx_arr = np.array([np.uintp(ch_pair.ch2.idx()) for ch_pair in ch_it], dtype=np.uint64)
cdef cnp.ndarray[cnp.float64_t, ndim=2] result = np.zeros([num_idx, num_fft], dtype=np.float64)

kernel_crossphase(&data[0, 0, 0], &result[0, 0], &ch1_idx_arr[0], &ch2_idx_arr[0], num_idx, num_fft, num_bins)

return(result)


@cython.boundscheck(False)
@cython.wraparound(False)
Expand Down Expand Up @@ -119,4 +167,17 @@ def kernel_crosspower_cy(cnp.ndarray[cnp.complex128_t, ndim=3] data, ch_it, fft_

return(result)


def kernel_crosspower_v2(cnp.ndarray[cnp.complex128_t, ndim=3] data, ch_it, fft_config):
cdef size_t num_idx = len(ch_it) # Length of index array
cdef size_t num_fft = data.shape[1] # Number of fft frequencies
cdef size_t num_bins = data.shape[2] # Number of ffts
cdef cnp.ndarray[cnp.uintp_t, ndim=1] ch1_idx_arr = np.array([np.uintp(ch_pair.ch1.idx()) for ch_pair in ch_it], dtype=np.uint64)
cdef cnp.ndarray[cnp.uintp_t, ndim=1] ch2_idx_arr = np.array([np.uintp(ch_pair.ch2.idx()) for ch_pair in ch_it], dtype=np.uint64)
cdef cnp.ndarray[cnp.float64_t, ndim=2] result = np.zeros([num_idx, num_fft], dtype=np.float64)

kernel_crosspower(&data[0, 0, 0], &result[0, 0], &ch1_idx_arr[0], &ch2_idx_arr[0], num_idx, num_fft, num_bins)

return(result)

# End of file kernels_spectral_py.pyx
12 changes: 12 additions & 0 deletions analysis/lib/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CC = icc

default: libkernels.a

libkernels.a: kernels.o
ar rcs $@ $^

kernels.o: kernels.c kernels.h
$(CC) -fPIC -qopenmp -O3 -c $<

clean:
rm *.o *.a
8 changes: 8 additions & 0 deletions analysis/lib/kernels.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#include <complex.h>


void kernel_coherence(double complex*, double*, size_t*, size_t*, size_t, size_t, size_t);

void kernel_crossphase(double complex*, double*, size_t*, size_t*, size_t, size_t, size_t);

void kernel_crosspower(double complex*, double*, size_t*, size_t*, size_t, size_t, size_t);
15 changes: 9 additions & 6 deletions analysis/setup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

# CC=icc LDSHARED="icc -shared" python setup.py build_ext --inplace


Expand All @@ -8,10 +7,14 @@
import numpy

extensions = [
Extension("kernels_spectral_cy", ["kernels_spectral_cy.pyx"],
include_dirs=[numpy.get_include()],
extra_compile_args=['-qopenmp'],
extra_link_args=['-qopenmp']),
Extension("kernels_spectral_cy",
sources=["kernels_spectral_cy.pyx"],
libraries=["kernels"],
library_dirs=["lib"],
include_dirs=[numpy.get_include(), "lib"],
extra_compile_args=['-qopenmp'],
extra_link_args=['-qopenmp']),
]

setup(name="kernels_spectral_cy", ext_modules=cythonize(extensions, compiler_directives={'language_level' : "3"}))
setup(name="kernels_spectral_cy",
ext_modules=cythonize(extensions, compiler_directives={'language_level' : "3"}))
19 changes: 1 addition & 18 deletions backends/backend_mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,18 @@ def __init__(self, cfg_mongo):
# Connect to mongodb

#logger = logging.getLogger("DB")
#print("mongodb_backend: Initializing mongodb backend")
self.client = pymongo.MongoClient("mongodb://mongodb07.nersc.gov/delta-fusion",
username = cfg_mongo["storage"]["username"],
password = cfg_mongo["storage"]["password"])
#print("mongodb_backend: Connection established")
db = self.client.get_database()
#print("mongodb_backend: db = ", db)
try:
self.collection = db.get_collection("test_analysis_" + cfg_mongo['run_id'])

# Initialize gridfs
self.fs = gridfs.GridFS(db)
#print("mongodb_backend: collection: ", self.collection)
except:
print("Could not get a collection")


def store_metadata(self, cfg, dispatch_seq):
"""Stores the metadata to the database
Expand All @@ -53,11 +50,6 @@ def store_metadata(self, cfg, dispatch_seq):
logger = logging.getLogger("DB")
logger.debug("backend_mongodb: entering store_metadata")

#db = self.client.get_database()
#collection = db.get_collection("test_analysis_" + cfg['run_id'])
#logger.info(f"backend_mongodb Connected to database {db.name}")
#logger.info(f"backend_mongodb Using collection {collection.name}")

j_str = serialize_dispatch_seq(dispatch_seq)
# Put the channel serialization in the corresponding key
j_str = '{"channel_serialization": ' + j_str + '}'
Expand Down Expand Up @@ -109,7 +101,6 @@ def store_task(self, task, future=None, dummy=True):
else:
storage_scheme["results"] = Binary(pickle.dumps(result))
self.collection.insert_one(storage_scheme)
print("***mongodb_backend*** Storing...")

return None

Expand All @@ -123,28 +114,20 @@ def store_data(self, data, info_dict):
info_dict: Dictionary with metadata to store
cfg: delta configuration object
"""
import sys
#logger = logging.get("DB")
#print(f"MongoDB: Storing data")

#info_dict['result'] = Binary(pickle.dumps(data))
# Create a binary object and store it in gridfs
fid = self.fs.put(Binary(pickle.dumps(data)))

info_dict.update({"result_gridfs": fid})
info_dict.update({"timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")})
info_dict.update({"description": "analysis results"})


#print("Inserting:")
#inserted_id = self.connection.insert_one(info_dict)
try:
inserted_id = self.collection.insert_one(info_dict)
except:
print("Unexpected error:", sys.exc_info()[0])
raise

#print(f"Wrote to MongoDB backend; id = {inserted_id}")


def store_one(self, item):
Expand Down

0 comments on commit c29216d

Please sign in to comment.