-
Notifications
You must be signed in to change notification settings - Fork 45
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
finished and tested the initial version of smart parking based fog fu…
…nctions
- Loading branch information
Showing
59 changed files
with
6,423 additions
and
182 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
npm install | ||
docker build -t "fogflow/connectedcar" . |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
FROM mhart/alpine-node | ||
|
||
WORKDIR /app | ||
ADD . /app | ||
ENTRYPOINT [ "node", "main.js" ] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
// define global variables or include third-party libraries (notice that the library must be specified in the package.json file as well) | ||
var speakerID = null; | ||
|
||
// | ||
// contextEntity: the received entities | ||
// publish, query, and subscribe are the callback functions for your own function to interact with the assigned nearby broker | ||
// publish: publish the generated entity, which could be a new entity or the update of an existing entity | ||
// query: query addtional information from the assigned nearby broker | ||
// subscribe: subscribe addtional infromation from the assigned nearby broker | ||
// | ||
exports.handler = function(contextEntity, publish, query, subscribe) | ||
{ | ||
console.log("enter into the user-defined fog function"); | ||
|
||
if (contextEntity == null) { | ||
return; | ||
} | ||
if (contextEntity.attributes == null) { | ||
return; | ||
} | ||
|
||
// ============================== publish ====================================================== | ||
|
||
// if you need to publish the generated result, please refer to the following example | ||
|
||
/* | ||
var updateEntity = {}; | ||
updateEntity.entityId = { | ||
id: "Twin.Home.0001", | ||
type: 'Home', | ||
isPattern: false | ||
}; | ||
updateEntity.attributes = {}; | ||
updateEntity.attributes.city = {type: 'string', value: 'Heidelberg'}; | ||
updateEntity.metadata = {}; | ||
updateEntity.metadata.location = { | ||
type: 'point', | ||
value: {'latitude': 33.0, 'longitude': -1.0} | ||
}; | ||
publish(updateEntity); | ||
console.log("publish: ", updateEntity); | ||
*/ | ||
|
||
|
||
// ============================== query ====================================================== | ||
|
||
// if you want to query addtional information from the assigned nearby broker, please refer to the following example | ||
|
||
/* | ||
var queryReq = {} | ||
queryReq.entities = [{type:'PublicSite', isPattern: true}]; | ||
var handleQueryResult = function(entityList) { | ||
for(var i=0; i<entityList.length; i++) { | ||
var entity = entityList[i]; | ||
console.log('===============' + i + '==================='); | ||
console.log(entity); | ||
} | ||
} | ||
query(queryReq, handleQueryResult); | ||
*/ | ||
|
||
|
||
// ============================== subscribe ====================================================== | ||
|
||
// if you want to subscribe addtional infromation from the assigned nearby broker, please refer to the following example | ||
|
||
/* | ||
var subscribeCtxReq = {}; | ||
subscribeCtxReq.entities = [{type: 'Home', isPattern: true}]; | ||
subscribeCtxReq.attributes = ['alert']; | ||
//subscribeCtxReq.restriction = {scopes: [{scopeType: 'stringQuery', scopeValue: 'geohash='+contextEntity.attributes.geohash.value}]} | ||
subscribe(subscribeCtxReq); | ||
*/ | ||
|
||
}; | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,202 @@ | ||
'use strict'; | ||
|
||
const NGSIClient = require('./ngsi/ngsiclient.js'); | ||
const NGSIAgent = require('./ngsi/ngsiagent.js'); | ||
const fogfunction = require('./function.js'); | ||
|
||
var ngsi10client = null; | ||
var brokerURL; | ||
var outputs = []; | ||
var threshold = 30; | ||
var myReferenceURL; | ||
var mySubscriptionId = null; | ||
var isConfigured = false; | ||
|
||
var buffer = []; | ||
|
||
function startApp() | ||
{ | ||
console.log('start to receive input data streams via a listening port'); | ||
} | ||
|
||
function stopApp() | ||
{ | ||
console.log('clean up the app'); | ||
} | ||
|
||
// handle the commands received from the engine | ||
function handleAdmin(req, commands, res) | ||
{ | ||
console.log('=============configuration commands============='); | ||
console.log(commands); | ||
|
||
handleCmds(commands); | ||
|
||
isConfigured = true; | ||
|
||
res.status(200).json({}); | ||
} | ||
|
||
function handleCmds(commands) | ||
{ | ||
for(var i = 0; i < commands.length; i++) { | ||
var cmd = commands[i]; | ||
console.log(cmd); | ||
handleCmd(cmd); | ||
console.log("handle next command"); | ||
} | ||
|
||
// send the updates in the buffer | ||
sendUpdateWithinBuffer(); | ||
} | ||
|
||
function handleCmd(commandObj) | ||
{ | ||
if (commandObj.command == 'CONNECT_BROKER') { | ||
connectBroker(commandObj); | ||
} else if (commandObj.command == 'SET_OUTPUTS') { | ||
setOutputs(commandObj); | ||
} else if (commandObj.command == 'SET_REFERENCE'){ | ||
setReferenceURL(commandObj); | ||
} | ||
} | ||
|
||
// connect to the IoT Broker | ||
function connectBroker(cmd) | ||
{ | ||
brokerURL = cmd.brokerURL; | ||
ngsi10client = new NGSIClient.NGSI10Client(brokerURL); | ||
console.log('connected to broker', cmd.brokerURL); | ||
} | ||
|
||
function setReferenceURL(cmd) | ||
{ | ||
myReferenceURL = cmd.url | ||
console.log('your application can subscribe addtional inputs under the reference URL: ', myReferenceURL); | ||
} | ||
|
||
|
||
function setOutputs(cmd) | ||
{ | ||
var outputStream = {}; | ||
outputStream.id = cmd.id; | ||
outputStream.type = cmd.type; | ||
|
||
outputs.push(outputStream); | ||
|
||
console.log('output has been set: ', cmd); | ||
} | ||
|
||
function sendUpdateWithinBuffer() | ||
{ | ||
for(var i=0; i<buffer.length; i++){ | ||
var tmp = buffer[i]; | ||
|
||
if (tmp.outputIdx > 0) { | ||
tmp.ctxObj.entityId.id = outputs[i].id; | ||
tmp.ctxObj.entityId.type = outputs[i].type; | ||
} | ||
|
||
ngsi10client.updateContext(tmp.ctxObj).then( function(data) { | ||
console.log('======send update======'); | ||
console.log(data); | ||
}).catch(function(error) { | ||
console.log(error); | ||
console.log('failed to update context'); | ||
}); | ||
} | ||
|
||
buffer= []; | ||
} | ||
|
||
// | ||
// query results from the assigned nearby IoT broker | ||
// | ||
function query(queryCtxReq, f) | ||
{ | ||
if (ngsi10client == null) { | ||
console.log("=== broker is not configured for your query"); | ||
return | ||
} | ||
|
||
ngsi10client.queryContext(queryCtxReq).then(f).catch( function(error) { | ||
console.log('failed to subscribe context'); | ||
}); | ||
} | ||
|
||
// | ||
// send subscriptions to IoT broker | ||
// | ||
function subscribe(subscribeCtxReq) | ||
{ | ||
if (ngsi10client == null) { | ||
console.log("=== broker is not configured for your subscription"); | ||
return | ||
} | ||
|
||
subscribeCtxReq.reference = myReferenceURL; | ||
|
||
console.log("================trigger my own subscription==================="); | ||
console.log(subscribeCtxReq); | ||
|
||
ngsi10client.subscribeContext(subscribeCtxReq).then( function(subscriptionId) { | ||
console.log("subscription id = " + subscriptionId); | ||
mySubscriptionId = subscriptionId; | ||
}).catch(function(error) { | ||
console.log('failed to subscribe context'); | ||
}); | ||
} | ||
|
||
// | ||
// publish context entities: | ||
// | ||
function publish(ctxUpdate) | ||
{ | ||
buffer.push(ctxUpdate) | ||
|
||
if (ngsi10client == null) { | ||
console.log("=== broker is not configured for your update"); | ||
return | ||
} | ||
|
||
for(var i=0; i<buffer.length; i++){ | ||
var update = buffer[i]; | ||
|
||
ngsi10client.updateContext(update).then( function(data) { | ||
console.log('======send update======'); | ||
console.log(data); | ||
}).catch(function(error) { | ||
console.log(error); | ||
console.log('failed to update context'); | ||
}); | ||
} | ||
|
||
buffer= []; | ||
} | ||
|
||
// handle the received results | ||
function handleNotify(req, ctxObjects, res) | ||
{ | ||
console.log('============handle notify=========================='); | ||
for(var i = 0; i < ctxObjects.length; i++) { | ||
console.log(ctxObjects[i]); | ||
fogfunction.handler(ctxObjects[i], publish, query, subscribe); | ||
} | ||
} | ||
|
||
// get the listening port number from the environment variables given by the FogFlow edge worker | ||
var myport = process.env.myport; | ||
|
||
// set up the NGSI agent to listen on | ||
NGSIAgent.setNotifyHandler(handleNotify); | ||
NGSIAgent.setAdminHandler(handleAdmin); | ||
NGSIAgent.start(myport, startApp); | ||
|
||
process.on('SIGINT', function() { | ||
NGSIAgent.stop(); | ||
stopApp(); | ||
|
||
process.exit(0); | ||
}); | ||
|
||
|
Oops, something went wrong.