Skip to content

Commit

Permalink
Merge pull request #119 from knix-microfunctions/release/0.9.0
Browse files Browse the repository at this point in the history
Release/0.9.0
  • Loading branch information
iakkus authored Sep 23, 2021
2 parents ff6184f + 4066efe commit abcdd3a
Show file tree
Hide file tree
Showing 58 changed files with 5,017 additions and 660 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ public void connect (Map<String,Integer> riakNodes) {
try {
Namespace bucket = new Namespace(BUCKET_TYPE_DEFAULT, MFN_KEYSPACES);
StoreBucketProperties props = new StoreBucketProperties.Builder(bucket).withNVal(NUM_NODES).build();

long t_start = System.currentTimeMillis();
client.execute(props);
this.logExecutionTime("connect(bucketProperties)", System.currentTimeMillis() - t_start);

this.initiateBucketToTypeMapping();
} catch (Exception e) {
Expand Down Expand Up @@ -176,6 +179,11 @@ public void close() {
LOGGER.info("Riak client shutdown.");
}

private void logExecutionTime(String commandName, long duration)
{
LOGGER.info(commandName + " execution time: " + duration + " ms.");
}

private boolean detectInvalidName (String str) {
if (str == null) {
return false;
Expand Down Expand Up @@ -203,7 +211,11 @@ public boolean createKeyspace (String keyspace, Metadata metadata) {
try {
Namespace bucket = new Namespace(BUCKET_TYPE_DEFAULT, keyspace);
StoreBucketProperties props = new StoreBucketProperties.Builder(bucket).withNVal(NUM_NODES).build();

long t_start = System.currentTimeMillis();
client.execute(props);
this.logExecutionTime("createKeyspace(bucketProperties)", System.currentTimeMillis() - t_start);

LOGGER.info("createKeyspace() Keyspace: " + keyspace + " Metadata: replication factor: " + Integer.toString(replicationFactor));
return this.insertRow(MFN_KEYSPACES, null, keyspace, ByteBuffer.allocate(Integer.BYTES).putInt(replicationFactor));
} catch (Exception e) {
Expand Down Expand Up @@ -394,7 +406,11 @@ private boolean createTableWithType (String keyspace, String table, String table

Namespace bucket = new Namespace(tableType, keyspace + ";" + table);
StoreBucketProperties props = new StoreBucketProperties.Builder(bucket).withNVal(replicationFactor).withW(replicationFactor).withR(replicationFactor).withNotFoundOk(false).build();

long t_start = System.currentTimeMillis();
client.execute(props);
this.logExecutionTime("createTableWithType(bucketProperties)", System.currentTimeMillis() - t_start);

LOGGER.info("createTableWithType() Keyspace: " + keyspace + " Table: " + table + " TableType: " + tableType);
boolean success = this.insertRow(keyspace, null, table, ByteBuffer.wrap(tableType.getBytes(StandardCharsets.UTF_8)));
if (success) {
Expand Down Expand Up @@ -534,7 +550,11 @@ public boolean insertRow (String keyspace, String table, String key, ByteBuffer
Location location = new Location(bucket, key);
RiakObject object = new RiakObject().setContentType(Constants.CTYPE_OCTET_STREAM).setValue(BinaryValue.unsafeCreate(value.array()));
StoreValue store = new StoreValue.Builder(object).withLocation(location).build();

long t_start = System.currentTimeMillis();
client.execute(store);
this.logExecutionTime("insertRow()", System.currentTimeMillis() - t_start);

return true;
} catch (Exception e) {
LOGGER.error("insertRow() failed. Keyspace: " + keyspace + " Table: " + table, e);
Expand Down Expand Up @@ -570,7 +590,10 @@ public AbstractMap.SimpleEntry<String, ByteBuffer> selectRow (String keyspace, S

Location location = new Location(bucket, key);
FetchValue fetch = new FetchValue.Builder(location).build();

long t_start = System.currentTimeMillis();
FetchValue.Response response = client.execute(fetch);
this.logExecutionTime("selectRow()", System.currentTimeMillis() - t_start);

RiakObject object = response.getValue(RiakObject.class);
if (object == null || object.getValue() == null) {
Expand Down Expand Up @@ -623,13 +646,20 @@ public boolean updateRow (String keyspace, String table, String key, ByteBuffer

Location location = new Location(bucket, key);
FetchValue fetch = new FetchValue.Builder(location).withOption(FetchValue.Option.DELETED_VCLOCK, true).build();

long t_start = System.currentTimeMillis();
FetchValue.Response response = client.execute(fetch);
this.logExecutionTime("updateRow(fetch)", System.currentTimeMillis() - t_start);

RiakObject object = response.getValue(RiakObject.class);
object.setValue(BinaryValue.unsafeCreate(value.array()));

StoreValue store = new StoreValue.Builder(object).withLocation(location).build();

t_start = System.currentTimeMillis();
client.execute(store);
this.logExecutionTime("updateRow(store)", System.currentTimeMillis() - t_start);

return true;
} catch (Exception e) {
LOGGER.error("updateRow() failed. Keyspace: " + keyspace + " Table: " + table, e);
Expand Down Expand Up @@ -669,7 +699,11 @@ private boolean deleteRowWithType (String keyspace, String table, String key, St

Location location = new Location(bucket, key);
DeleteValue delete = new DeleteValue.Builder(location).build();

long t_start = System.currentTimeMillis();
client.execute(delete);
this.logExecutionTime("deleteRowWithType()", System.currentTimeMillis() - t_start);

return true;
} catch (Exception e) {
LOGGER.error("deleteRowWithType() failed. Keyspace: " + keyspace + " Table: " + table + " TableType: " + tableType, e);
Expand Down Expand Up @@ -708,7 +742,10 @@ private List<String> selectKeysWithType (String keyspace, String table, int star
}

ListKeys list = new ListKeys.Builder(bucket).build();

long t_start = System.currentTimeMillis();
ListKeys.Response response = client.execute(list);
this.logExecutionTime("selectKeysWithType()", System.currentTimeMillis() - t_start);

List<String> keys = new ArrayList<String>();
for (Location location: response) {
Expand Down Expand Up @@ -744,8 +781,11 @@ private List<String> selectAllKeysWithType (String keyspace, String table, Strin
}

ListKeys list = new ListKeys.Builder(bucket).build();
ListKeys.Response response = client.execute(list);

long t_start = System.currentTimeMillis();
ListKeys.Response response = client.execute(list);
this.logExecutionTime("selectAllKeysWithType()", System.currentTimeMillis() - t_start);

List<String> keys = new ArrayList<String>();
for (Location location: response) {
keys.add(location.getKeyAsString());
Expand Down Expand Up @@ -787,7 +827,11 @@ public AbstractMap.SimpleEntry<String, Long> getCounter (String keyspace, String
Location location = new Location(bucket, counterName);

FetchCounter fetch = new FetchCounter.Builder(location).build();

long t_start = System.currentTimeMillis();
FetchCounter.Response response = client.execute(fetch);
this.logExecutionTime("getCounter()", System.currentTimeMillis() - t_start);

RiakCounter counter = response.getDatatype();
Long counterValue = counter.view();
return new AbstractMap.SimpleEntry<String, Long>(counterName, counterValue);
Expand Down Expand Up @@ -815,7 +859,11 @@ public AbstractMap.SimpleEntry<String, Long> incrementCounter (String keyspace,

CounterUpdate delta = new CounterUpdate(increment);
UpdateCounter update = new UpdateCounter.Builder(location, delta).withReturnDatatype(true).build();

long t_start = System.currentTimeMillis();
UpdateCounter.Response response = client.execute(update);
this.logExecutionTime("incrementCounter()", System.currentTimeMillis() - t_start);

RiakCounter counter = response.getDatatype();
Long counterValue = counter.view();
return new AbstractMap.SimpleEntry<String, Long>(counterName, counterValue);
Expand Down Expand Up @@ -880,7 +928,11 @@ public AbstractMap.SimpleEntry<String, Set<String>> retrieveSet (String keyspace
Location location = new Location(bucket, setName);

FetchSet fetch = new FetchSet.Builder(location).build();

long t_start = System.currentTimeMillis();
FetchSet.Response response = client.execute(fetch);
this.logExecutionTime("retrieveSet()", System.currentTimeMillis() - t_start);

RiakSet rSet = response.getDatatype();
Set<BinaryValue> binarySet = rSet.view();

Expand All @@ -907,7 +959,10 @@ public boolean addItemToSet (String keyspace, String table, String setName, Stri

SetUpdate item = new SetUpdate().add(setItem);
UpdateSet update = new UpdateSet.Builder(location, item).build();
long t_start = System.currentTimeMillis();
client.execute(update);
this.logExecutionTime("addItemToSet()", System.currentTimeMillis() - t_start);

return true;
} catch (Exception e) {
LOGGER.error("addItemToSet() failed. Keyspace: " + keyspace + " Table: " + table, e);
Expand All @@ -926,12 +981,20 @@ public boolean removeItemFromSet (String keyspace, String table, String setName,
Location location = new Location(bucket, setName);

FetchSet fetch = new FetchSet.Builder(location).build();

long t_start = System.currentTimeMillis();
FetchSet.Response response = client.execute(fetch);
this.logExecutionTime("removeItemFromSet(fetch)", System.currentTimeMillis() - t_start);

Context context = response.getContext();

SetUpdate item = new SetUpdate().remove(setItem);
UpdateSet update = new UpdateSet.Builder(location, item).withContext(context).build();

t_start = System.currentTimeMillis();
client.execute(update);
this.logExecutionTime("removeItemFromSet(update)", System.currentTimeMillis() - t_start);

return true;
} catch (Exception e) {
LOGGER.error("removeItemFromSet() failed. Keyspace: " + keyspace + " Table: " + table, e);
Expand All @@ -950,7 +1013,11 @@ public boolean containsItemInSet (String keyspace, String table, String setName,
Location location = new Location(bucket, setName);

FetchSet fetch = new FetchSet.Builder(location).build();

long t_start = System.currentTimeMillis();
FetchSet.Response response = client.execute(fetch);
this.logExecutionTime("containsItemInSet()", System.currentTimeMillis() - t_start);

RiakSet rSet = response.getDatatype();
Set<BinaryValue> binarySet = rSet.view();
return binarySet.contains(BinaryValue.create(setItem));
Expand Down Expand Up @@ -995,7 +1062,11 @@ public int getSizeOfSet (String keyspace, String table, String setName) {
Location location = new Location(bucket, setName);

FetchSet fetch = new FetchSet.Builder(location).build();

long t_start = System.currentTimeMillis();
FetchSet.Response response = client.execute(fetch);
this.logExecutionTime("getSizeOfSet()", System.currentTimeMillis() - t_start);

RiakSet rSet = response.getDatatype();
Set<BinaryValue> binarySet = rSet.view();
return binarySet.size();
Expand Down Expand Up @@ -1039,7 +1110,11 @@ public AbstractMap.SimpleEntry<String, Set<String>> retrieveKeysetFromMap (Strin
Location location = new Location(bucket, mapName);

FetchMap fetch = new FetchMap.Builder(location).build();

long t_start = System.currentTimeMillis();
FetchMap.Response response = client.execute(fetch);
this.logExecutionTime("retrieveKeysetFromMap()", System.currentTimeMillis() - t_start);

RiakMap rMap = response.getDatatype();
Map<BinaryValue, List<RiakDatatype>> entries = rMap.view();

Expand All @@ -1065,7 +1140,11 @@ public AbstractMap.SimpleEntry<String, Map<String, ByteBuffer>> retrieveAllEntri
Location location = new Location(bucket, mapName);

FetchMap fetch = new FetchMap.Builder(location).build();

long t_start = System.currentTimeMillis();
FetchMap.Response response = client.execute(fetch);
this.logExecutionTime("retrieveAllEntriesFromMap()", System.currentTimeMillis() - t_start);

RiakMap rMap = response.getDatatype();
Map<BinaryValue, List<RiakDatatype>> entries = rMap.view();

Expand Down Expand Up @@ -1096,7 +1175,11 @@ public boolean putEntryToMap (String keyspace, String table, String mapName, Str
RegisterUpdate register = new RegisterUpdate(BinaryValue.unsafeCreate(entryValue.array()));
MapUpdate entry = new MapUpdate().update(entryKey, register);
UpdateMap update = new UpdateMap.Builder(location, entry).build();

long t_start = System.currentTimeMillis();
client.execute(update);
this.logExecutionTime("putEntryToMap()", System.currentTimeMillis() - t_start);

return true;
} catch (Exception e) {
LOGGER.error("putEntryToMap() failed. Keyspace: " + keyspace + " Table: " + table, e);
Expand All @@ -1115,7 +1198,11 @@ public AbstractMap.SimpleEntry<String, ByteBuffer> getEntryFromMap (String keysp
Location location = new Location(bucket, mapName);

FetchMap fetch = new FetchMap.Builder(location).build();

long t_start = System.currentTimeMillis();
FetchMap.Response response = client.execute(fetch);
this.logExecutionTime("getEntryFromMap()", System.currentTimeMillis() - t_start);

RiakMap rMap = response.getDatatype();
RiakRegister rRegister = rMap.getRegister(entryKey);
ByteBuffer entryValue = ByteBuffer.wrap(rRegister.view().unsafeGetValue());
Expand All @@ -1137,12 +1224,20 @@ public boolean removeEntryFromMap (String keyspace, String table, String mapName
Location location = new Location(bucket, mapName);

FetchMap fetch = new FetchMap.Builder(location).build();

long t_start = System.currentTimeMillis();
FetchMap.Response response = client.execute(fetch);
this.logExecutionTime("removeEntryFromMap(fetch)", System.currentTimeMillis() - t_start);

Context context = response.getContext();

MapUpdate entry = new MapUpdate().removeRegister(entryKey);
UpdateMap update = new UpdateMap.Builder(location, entry).withContext(context).build();

t_start = System.currentTimeMillis();
client.execute(update);
this.logExecutionTime("removeEntryFromMap(update)", System.currentTimeMillis() - t_start);

return true;
} catch (Exception e) {
LOGGER.error("removeEntryFromMap() failed. Keyspace: " + keyspace + " Table: " + table, e);
Expand All @@ -1161,7 +1256,11 @@ public boolean containsKeyInMap (String keyspace, String table, String mapName,
Location location = new Location(bucket, mapName);

FetchMap fetch = new FetchMap.Builder(location).build();

long t_start = System.currentTimeMillis();
FetchMap.Response response = client.execute(fetch);
this.logExecutionTime("containsKeyInMap()", System.currentTimeMillis() - t_start);

RiakMap rMap = response.getDatatype();
Map<BinaryValue, List<RiakDatatype>> entries = rMap.view();
return entries.containsKey(BinaryValue.create(entryKey));
Expand Down Expand Up @@ -1206,7 +1305,11 @@ public int getSizeOfMap (String keyspace, String table, String mapName) {
Location location = new Location(bucket, mapName);

FetchMap fetch = new FetchMap.Builder(location).build();

long t_start = System.currentTimeMillis();
FetchMap.Response response = client.execute(fetch);
this.logExecutionTime("getSizeOfMap()", System.currentTimeMillis() - t_start);

RiakMap rMap = response.getDatatype();
Map<BinaryValue, List<RiakDatatype>> entries = rMap.view();
return entries.size();
Expand Down
5 changes: 4 additions & 1 deletion FunctionWorker/python/DataLayerClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ def __init__(self, locality=1, sid=None, wid=None, suid=None, is_wf_private=Fals
#print("Creating datalayer client in keyspace=%s, tablename=%s, maptablename=%s, settablename=%s, countertablename=%s" % (self.keyspace,self.tablename, self.maptablename, self.settablename, self.countertablename))
self.locality = locality

self._is_running = True

self.connect()

if init_tables:
Expand Down Expand Up @@ -105,7 +107,7 @@ def _drop_keyspace(self):

def connect(self):
retry = 0.5 #s
while True:
while self._is_running:
try:
host, port = self.dladdress.split(':')
self.socket = TSocket.TSocket(host, int(port))
Expand Down Expand Up @@ -607,6 +609,7 @@ def listKeys(self, start, count, tableName=None):
return keys_response

def shutdown(self):
self._is_running = False
try:
self.transport.close()
except Thrift.TException as exc:
Expand Down
Loading

0 comments on commit abcdd3a

Please sign in to comment.