Skip to content

Commit

Permalink
Update README to show pagerank and add DataAuths to python user
Browse files Browse the repository at this point in the history
gh-595-pyspark-api
Add to python-api README
Include data auths in the python user
Change PythonSerialiserConfig to look for the nested json object "serialisers" within the python config json
  • Loading branch information
JSWard committed Mar 29, 2019
1 parent c1234a5 commit b240d9f
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 10 deletions.
35 changes: 34 additions & 1 deletion python-api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ rdd_op = gp.GetPySparkRDDOfAllElements(view=view)
rdd = graph.execute(rdd_op, user)
```

USe pyspark to calculate the distribution of counts
Use pyspark to calculate the distribution of counts

```
def getCount(edge):
Expand All @@ -130,6 +130,39 @@ def getCount(edge):
count_distribution = rdd.map(getCount).reduceByKey(lambda a, b: a + b).collect()
```

Use pyspark to create a GraphFrame

```
from gafferpy_pyspark import gaffer_pyspark as gp
from graphframes import *
edge=g.ElementDefinition(group="YOUR_EDGE_GROUP",group_by=[])
entity=g.ElementDefinition(group="YOUR_ENTITY_GROUP",group_by=[])
entityView=g.View(
entities=[entity]
)
edgeView=g.View(
edges=[edge]
)
df_entity_op = gp.GetPysparkDataFrameOfElements(entityView, sampleRatio=0.1)
df_edge_op = gp.GetPysparkDataFrameOfElements(edgeView, sampleRatio=0.1)
df_entity = graph.execute(df_entity_op, user)
df_edge = graph.execute(df_edge_op, user)
edges = df_edge.withColumnRenamed("destination", "dst").withColumnRenamed("source", "src")
entities = df_entity.withColumnRenamed("vertex", "id")
gf = GraphFrame(entities, edges)
```

Use pyspark to run Page Rank on a GraphFrame

```
results = gf.pageRank(resetProbability=0.15, maxIter=10)
results.vertices.show()
```

### Use with larger graphs ###

If you already have a large Gaffer instance running and want to use the pyspark api with it, follow the same steps above except that you will need to point to the larger graph's schema, graphconfig and store-properties files when you create the python graph object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,18 @@ public PythonSerialiserConfig(final FileInputStream fis) {
}

public PythonSerialiserConfig(final byte[] bytes) {
Map<String, String> map = null;
Map<String, Object> map = null;
Map<String, String> serialisersMap = null;
this.serialisers = new HashMap<>();
try {
map = JSONSerialiser.deserialise(bytes, Map.class);
} catch (final SerialisationException e) {
e.printStackTrace();
}

for (final String s : map.keySet()) {
serialisersMap = (Map<String, String>) map.get("serialisers");
} catch (final SerialisationException e) {
e.printStackTrace();
}
for (final String s : serialisersMap.keySet()) {
try {
this.serialisers.put(Class.forName(s), Class.forName(map.get(s)));
this.serialisers.put(Class.forName(s), Class.forName(serialisersMap.get(s)));
} catch (final ClassNotFoundException e) {
e.printStackTrace();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,15 @@ def convertElement(input):
class User(g.ToJson):
CLASS='uk.gov.gchq.gaffer.user.User'

def __init__(self, user_id=None):
def __init__(self, user_id=None, data_auths=None):
self._class_name=self.CLASS,
self.user_id=user_id;
self.data_auths=data_auths;

def to_json(self):
return {'userId': self.user_id}
json = {}
if self.user_id is not None:
json['userId'] = self.user_id
if self.data_auths is not None:
json['dataAuths'] = self.data_auths
return json

0 comments on commit b240d9f

Please sign in to comment.