Skip to content
This repository has been archived by the owner on Oct 16, 2023. It is now read-only.

Commit

Permalink
Update README to show pagerank and add DataAuths to python user (#707)
Browse files Browse the repository at this point in the history
gh-595-pyspark-api
Add to python-api README
Include data auths in the python user
Change PythonSerialiserConfig to look for the nested json object "serialisers" within the python config json
  • Loading branch information
JSWard authored and m316257 committed Apr 1, 2019
1 parent 71c1980 commit d727fd8
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 10 deletions.
35 changes: 34 additions & 1 deletion python-api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ rdd_op = gp.GetPySparkRDDOfAllElements(view=view)
rdd = graph.execute(rdd_op, user)
```

USe pyspark to calculate the distribution of counts
Use pyspark to calculate the distribution of counts

```
def getCount(edge):
Expand All @@ -130,6 +130,39 @@ def getCount(edge):
count_distribution = rdd.map(getCount).reduceByKey(lambda a, b: a + b).collect()
```

Use pyspark to create a GraphFrame

```
from gafferpy_pyspark import gaffer_pyspark as gp
from graphframes import *
edge=g.ElementDefinition(group="YOUR_EDGE_GROUP",group_by=[])
entity=g.ElementDefinition(group="YOUR_ENTITY_GROUP",group_by=[])
entityView=g.View(
entities=[entity]
)
edgeView=g.View(
edges=[edge]
)
df_entity_op = gp.GetPysparkDataFrameOfElements(entityView, sampleRatio=0.1)
df_edge_op = gp.GetPysparkDataFrameOfElements(edgeView, sampleRatio=0.1)
df_entity = graph.execute(df_entity_op, user)
df_edge = graph.execute(df_edge_op, user)
edges = df_edge.withColumnRenamed("destination", "dst").withColumnRenamed("source", "src")
entities = df_entity.withColumnRenamed("vertex", "id")
gf = GraphFrame(entities, edges)
```

Use pyspark to run Page Rank on a GraphFrame

```
results = gf.pageRank(resetProbability=0.15, maxIter=10)
results.vertices.show()
```

### Use with larger graphs ###

If you already have a large Gaffer instance running and want to use the pyspark api with it, follow the same steps above except that you will need to point to the larger graph's schema, graphconfig and store-properties files when you create the python graph object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,18 @@ public PythonSerialiserConfig(final FileInputStream fis) {
}

public PythonSerialiserConfig(final byte[] bytes) {
Map<String, String> map = null;
Map<String, Object> map = null;
Map<String, String> serialisersMap = null;
this.serialisers = new HashMap<>();
try {
map = JSONSerialiser.deserialise(bytes, Map.class);
} catch (final SerialisationException e) {
e.printStackTrace();
}

for (final String s : map.keySet()) {
serialisersMap = (Map<String, String>) map.get("serialisers");
} catch (final SerialisationException e) {
e.printStackTrace();
}
for (final String s : serialisersMap.keySet()) {
try {
this.serialisers.put(Class.forName(s), Class.forName(map.get(s)));
this.serialisers.put(Class.forName(s), Class.forName(serialisersMap.get(s)));
} catch (final ClassNotFoundException e) {
e.printStackTrace();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,15 @@ def convertElement(input):
class User(g.ToJson):
CLASS='uk.gov.gchq.gaffer.user.User'

def __init__(self, user_id=None):
def __init__(self, user_id=None, data_auths=None):
self._class_name=self.CLASS,
self.user_id=user_id;
self.data_auths=data_auths;

def to_json(self):
return {'userId': self.user_id}
json = {}
if self.user_id is not None:
json['userId'] = self.user_id
if self.data_auths is not None:
json['dataAuths'] = self.data_auths
return json

0 comments on commit d727fd8

Please sign in to comment.