diff --git a/cmake/jemalloc.cmake b/cmake/jemalloc.cmake index 6651fef..cfd11a0 100644 --- a/cmake/jemalloc.cmake +++ b/cmake/jemalloc.cmake @@ -24,15 +24,15 @@ ExternalProject_Add(jemalloc ExternalProject_Get_Property(jemalloc INSTALL_DIR) add_library(jemalloc_STATIC STATIC IMPORTED) -set_property(TARGET jemalloc_STATIC PROPERTY IMPORTED_LOCATION ${INSTALL_DIR}/lib/libjemallloc.a) +set_property(TARGET jemalloc_STATIC PROPERTY IMPORTED_LOCATION ${INSTALL_DIR}/src/jemalloc/lib/libjemalloc.a) add_dependencies(jemalloc_STATIC jemalloc) add_library(jemalloc_STATIC_PIC STATIC IMPORTED) -set_property(TARGET jemalloc_STATIC_PIC PROPERTY IMPORTED_LOCATION ${INSTALL_DIR}/lib/libjemallloc_pic.a) +set_property(TARGET jemalloc_STATIC_PIC PROPERTY IMPORTED_LOCATION ${INSTALL_DIR}/src/jemalloc/lib/libjemalloc_pic.a) add_dependencies(jemalloc_STATIC_PIC jemalloc) add_library(jemalloc_SHARED SHARED IMPORTED) -set_property(TARGET jemalloc_SHARED PROPERTY IMPORTED_LOCATION ${INSTALL_DIR}/lib/libjemallloc.so) +set_property(TARGET jemalloc_SHARED PROPERTY IMPORTED_LOCATION ${INSTALL_DIR}/src/jemalloc/lib/libjemalloc.so) add_dependencies(jemalloc_SHARED jemalloc) if (!APPLE) diff --git a/euler/client/CMakeLists.txt b/euler/client/CMakeLists.txt index def7102..48cd93b 100644 --- a/euler/client/CMakeLists.txt +++ b/euler/client/CMakeLists.txt @@ -11,7 +11,7 @@ add_library(client SHARED rpc_client.cc rpc_manager.cc status.cc) -target_link_libraries(client common core proto glog) +target_link_libraries(client common core proto glog jemalloc_STATIC_PIC) add_executable(graph_test graph_test.cc) target_link_libraries(graph_test client diff --git a/euler/common/compact_weighted_collection.h b/euler/common/compact_weighted_collection.h index 53f2153..5010d43 100644 --- a/euler/common/compact_weighted_collection.h +++ b/euler/common/compact_weighted_collection.h @@ -58,10 +58,10 @@ class CompactWeightedCollection : public WeightedCollection { CompactWeightedCollection() { } - void Init(const std::vector& ids, + bool Init(const std::vector& ids, const std::vector& weights) override; - void Init(const std::vector>& id_weight_pairs) override; + bool Init(const std::vector>& id_weight_pairs) override; std::pair Sample() const override; @@ -78,7 +78,7 @@ class CompactWeightedCollection : public WeightedCollection { }; template -void CompactWeightedCollection::Init(const std::vector& ids, +bool CompactWeightedCollection::Init(const std::vector& ids, const std::vector& weights) { if (ids.size() == weights.size()) { sum_weight_ = 0.0; @@ -89,13 +89,15 @@ void CompactWeightedCollection::Init(const std::vector& ids, sum_weight_ += weights[i]; sum_weights_[i] = sum_weight_; } + return true; } else { LOG(ERROR) << "ids size != weights size, init error"; + return false; } } template -void CompactWeightedCollection::Init( +bool CompactWeightedCollection::Init( const std::vector>& id_weight_pairs) { sum_weight_ = 0.0; ids_.resize(id_weight_pairs.size()); @@ -105,6 +107,7 @@ void CompactWeightedCollection::Init( sum_weight_ += id_weight_pairs[i].second; sum_weights_[i] = sum_weight_; } + return true; } template diff --git a/euler/common/fast_weighted_collection.h b/euler/common/fast_weighted_collection.h index d329df4..374bd9e 100644 --- a/euler/common/fast_weighted_collection.h +++ b/euler/common/fast_weighted_collection.h @@ -27,10 +27,10 @@ namespace common { template class FastWeightedCollection : public WeightedCollection { public: - void Init(const std::vector& ids, + bool Init(const std::vector& ids, const std::vector& weights) override; - void Init(const std::vector>& id_weight_pairs) override; + bool Init(const std::vector>& id_weight_pairs) override; std::pair Sample() const override; @@ -48,10 +48,10 @@ class FastWeightedCollection : public WeightedCollection { }; template -void FastWeightedCollection::Init(const std::vector& ids, +bool FastWeightedCollection::Init(const std::vector& ids, const std::vector& weights) { if (ids.size() != weights.size()) { - return; + return false; } ids_.resize(ids.size()); weights_.resize(weights.size()); @@ -66,11 +66,11 @@ void FastWeightedCollection::Init(const std::vector& ids, norm_weights[i] /= sum_weight_; } alias_.Init(norm_weights); - return; + return true; } template -void FastWeightedCollection::Init( +bool FastWeightedCollection::Init( const std::vector>& id_weight_pairs) { ids_.resize(id_weight_pairs.size()); weights_.resize(id_weight_pairs.size()); @@ -85,7 +85,7 @@ void FastWeightedCollection::Init( norm_weights[i] /= sum_weight_; } alias_.Init(norm_weights); - return; + return true; } template diff --git a/euler/common/weighted_collection.h b/euler/common/weighted_collection.h index 4fddc43..f9bb87b 100644 --- a/euler/common/weighted_collection.h +++ b/euler/common/weighted_collection.h @@ -28,9 +28,9 @@ template class WeightedCollection { public: virtual ~WeightedCollection() {} - virtual void Init(const std::vector& ids, + virtual bool Init(const std::vector& ids, const std::vector& weights) = 0; - virtual void Init( + virtual bool Init( const std::vector >& id_weight_pairs) = 0; virtual std::pair Sample() const = 0; virtual size_t GetSize() const = 0; diff --git a/euler/core/compact_edge.cc b/euler/core/compact_edge.cc index dadcbe0..52795c7 100644 --- a/euler/core/compact_edge.cc +++ b/euler/core/compact_edge.cc @@ -46,7 +46,7 @@ bool CompactEdge::DeSerialize(const char* s, size_t size) { } if (!bytes_reader.GetInt32(&type_) || // parse type !bytes_reader.GetFloat(&weight_)) { // parse weight - LOG(ERROR) << "edge info error"; + LOG(ERROR) << "edge info error, edge_id: " << src_id << "," << dst_id; return false; } @@ -55,12 +55,12 @@ bool CompactEdge::DeSerialize(const char* s, size_t size) { // parse uint64 feature int32_t uint64_feature_type_num = 0; if (!bytes_reader.GetInt32(&uint64_feature_type_num)) { - LOG(ERROR) << "uint64 feature type num error"; + LOG(ERROR) << "uint64 feature type num error, edge_id: " << src_id << "," << dst_id; return false; } if (!bytes_reader.GetInt32List(uint64_feature_type_num, &uint64_features_idx_)) { - LOG(ERROR) << "uint64 feature idx list error"; + LOG(ERROR) << "uint64 feature idx list error, edge_id: " << src_id << "," << dst_id; return false; } int32_t uint64_fv_num = 0; @@ -69,19 +69,19 @@ bool CompactEdge::DeSerialize(const char* s, size_t size) { uint64_features_idx_[i] = uint64_fv_num; } if (!bytes_reader.GetUInt64List(uint64_fv_num, &uint64_features_)) { - LOG(ERROR) << "uint64 feature value list error"; + LOG(ERROR) << "uint64 feature value list error, edge_id: " << src_id << "," << dst_id; return false; } // parse float feature int32_t float_feature_type_num = 0; if (!bytes_reader.GetInt32(&float_feature_type_num)) { - LOG(ERROR) << "float feature type num error"; + LOG(ERROR) << "float feature type num error, edge_id: " << src_id << "," << dst_id; return false; } if (!bytes_reader.GetInt32List(float_feature_type_num, &float_features_idx_)) { - LOG(ERROR) << "float feature idx list error"; + LOG(ERROR) << "float feature idx list error, edge_id: " << src_id << "," << dst_id; return false; } int32_t float_fv_num = 0; @@ -90,19 +90,19 @@ bool CompactEdge::DeSerialize(const char* s, size_t size) { float_features_idx_[i] = float_fv_num; } if (!bytes_reader.GetFloatList(float_fv_num, &float_features_)) { - LOG(ERROR) << "float feature value list error"; + LOG(ERROR) << "float feature value list error, edge_id: " << src_id << "," << dst_id; return false; } // parse binary feature int32_t binary_feature_type_num = 0; if (!bytes_reader.GetInt32(&binary_feature_type_num)) { - LOG(ERROR) << "binary feature type num error"; + LOG(ERROR) << "binary feature type num error, edge_id: " << src_id << "," << dst_id; return false; } if (!bytes_reader.GetInt32List(binary_feature_type_num, &binary_features_idx_)) { - LOG(ERROR) << "binary feature idx list error"; + LOG(ERROR) << "binary feature idx list error, edge_id: " << src_id << "," << dst_id; return false; } int32_t binary_fv_num = 0; @@ -111,7 +111,7 @@ bool CompactEdge::DeSerialize(const char* s, size_t size) { binary_features_idx_[i] = binary_fv_num; } if (!bytes_reader.GetString(binary_fv_num, &binary_features_)) { - LOG(ERROR) << "binary feature value list error"; + LOG(ERROR) << "binary feature value list error, edge_id: " << src_id << "," << dst_id; return false; } diff --git a/euler/core/compact_node.cc b/euler/core/compact_node.cc index db7b69f..5507f59 100644 --- a/euler/core/compact_node.cc +++ b/euler/core/compact_node.cc @@ -281,7 +281,7 @@ bool CompactNode::DeSerialize(const char* s, size_t size) { int32_t edge_group_num = 0; if (!bytes_reader.GetInt32(&edge_group_num)) { - LOG(ERROR) << "edge group num error"; + LOG(ERROR) << "edge group num error, node_id: " << id_; return false; } @@ -293,20 +293,22 @@ bool CompactNode::DeSerialize(const char* s, size_t size) { std::vector edge_group_size_list; if (!bytes_reader.GetInt32List(edge_group_num, &edge_group_size_list)) { - LOG(ERROR) << "edge group size list error"; + LOG(ERROR) << "edge group size list error, node_id: " << id_; return false; } std::vector edge_group_weight_list; if (!bytes_reader.GetFloatList(edge_group_num, &edge_group_weight_list)) { - LOG(ERROR) << "edge group weight list error"; + LOG(ERROR) << "edge group weight list error, node_id: " << id_; return false; } // build edge_group_collection_ - edge_group_collection_.Init(edge_group_ids, edge_group_weight_list); - + if (!edge_group_collection_.Init(edge_group_ids, edge_group_weight_list)) { + LOG(ERROR) << "edge group collection error, node_id: " << id_; + return false; + } // build neighbors info int32_t total_neighbors_num = 0; std::vector> ids_list(edge_group_num); @@ -317,7 +319,7 @@ bool CompactNode::DeSerialize(const char* s, size_t size) { ids_list[i] = std::vector(); if (!bytes_reader.GetUInt64List(edge_group_size_list[i], &ids_list[i])) { - LOG(ERROR) << "neighbor id list error"; + LOG(ERROR) << "neighbor id list error, node_id: " << id_; return false; } } @@ -327,7 +329,7 @@ bool CompactNode::DeSerialize(const char* s, size_t size) { weights_list[i] = std::vector(); if (!bytes_reader.GetFloatList(edge_group_size_list[i], &weights_list[i])) { - LOG(ERROR) << "neighbor weight list error"; + LOG(ERROR) << "neighbor weight list error, node_id: " << id_; return false; } } @@ -356,16 +358,15 @@ bool CompactNode::DeSerialize(const char* s, size_t size) { neighbors_weight_.push_back(sum_weight); } } - // parse uint64 feature int32_t uint64_feature_type_num = 0; if (!bytes_reader.GetInt32(&uint64_feature_type_num)) { - LOG(ERROR) << "uint64 feature type num error"; + LOG(ERROR) << "uint64 feature type num error, node_id: " << id_; return false; } if (!bytes_reader.GetInt32List(uint64_feature_type_num, &uint64_features_idx_)) { - LOG(ERROR) << "uint64 feature idx list error"; + LOG(ERROR) << "uint64 feature idx list error, node_id: " << id_; return false; } int32_t uint64_fv_num = 0; @@ -374,19 +375,19 @@ bool CompactNode::DeSerialize(const char* s, size_t size) { uint64_features_idx_[i] = uint64_fv_num; } if (!bytes_reader.GetUInt64List(uint64_fv_num, &uint64_features_)) { - LOG(ERROR) << "uint64 feature value list error"; + LOG(ERROR) << "uint64 feature value list error, node_id: " << id_; return false; } // parse float feature int32_t float_feature_type_num = 0; if (!bytes_reader.GetInt32(&float_feature_type_num)) { - LOG(ERROR) << "float feature type num error"; + LOG(ERROR) << "float feature type num error, node_id: " << id_; return false; } if (!bytes_reader.GetInt32List(float_feature_type_num, &float_features_idx_)) { - LOG(ERROR) << "float feature idx list error"; + LOG(ERROR) << "float feature idx list error, node_id: " << id_; return false; } int32_t float_fv_num = 0; @@ -395,19 +396,19 @@ bool CompactNode::DeSerialize(const char* s, size_t size) { float_features_idx_[i] = float_fv_num; } if (!bytes_reader.GetFloatList(float_fv_num, &float_features_)) { - LOG(ERROR) << "float feature value list error"; + LOG(ERROR) << "float feature value list error, node_id: " << id_; return false; } // parse binary feature int32_t binary_feature_type_num = 0; if (!bytes_reader.GetInt32(&binary_feature_type_num)) { - LOG(ERROR) << "binary feature type num error"; + LOG(ERROR) << "binary feature type num error, node_id: " << id_; return false; } if (!bytes_reader.GetInt32List(binary_feature_type_num, &binary_features_idx_)) { - LOG(ERROR) << "binary feature idx list error"; + LOG(ERROR) << "binary feature idx list error, node_id: " << id_; return false; } int32_t binary_fv_num = 0; @@ -416,10 +417,9 @@ bool CompactNode::DeSerialize(const char* s, size_t size) { binary_features_idx_[i] = binary_fv_num; } if (!bytes_reader.GetString(binary_fv_num, &binary_features_)) { - LOG(ERROR) << "binary feature value list error"; + LOG(ERROR) << "binary feature value list error, node_id: " << id_; return false; } - return true; } diff --git a/euler/core/fast_edge.cc b/euler/core/fast_edge.cc index ec679d8..7755dcd 100644 --- a/euler/core/fast_edge.cc +++ b/euler/core/fast_edge.cc @@ -46,7 +46,7 @@ bool FastEdge::DeSerialize(const char* s, size_t size) { if (!bytes_reader.GetInt32(&type_) || // parse type !bytes_reader.GetFloat(&weight_)) { // parse weight - LOG(ERROR) << "edge info error"; + LOG(ERROR) << "edge info error, edge_id: " << src_id << "," << dst_id; return false; } @@ -55,12 +55,12 @@ bool FastEdge::DeSerialize(const char* s, size_t size) { // parse uint64 feature int32_t uint64_feature_type_num = 0; if (!bytes_reader.GetInt32(&uint64_feature_type_num)) { - LOG(ERROR) << "uint64 feature type num error"; + LOG(ERROR) << "uint64 feature type num error, edge_node: " << src_id << "," << dst_id; return false; } if (!bytes_reader.GetInt32List(uint64_feature_type_num, &uint64_features_idx_)) { - LOG(ERROR) << "uint64 feature idx list error"; + LOG(ERROR) << "uint64 feature idx list error, edge_node: " << src_id << "," << dst_id; return false; } int32_t uint64_fv_num = 0; @@ -69,19 +69,19 @@ bool FastEdge::DeSerialize(const char* s, size_t size) { uint64_features_idx_[i] = uint64_fv_num; } if (!bytes_reader.GetUInt64List(uint64_fv_num, &uint64_features_)) { - LOG(ERROR) << "uint64 feature value list error"; + LOG(ERROR) << "uint64 feature value list error, edge_node: " << src_id << "," << dst_id; return false; } // parse float feature int32_t float_feature_type_num = 0; if (!bytes_reader.GetInt32(&float_feature_type_num)) { - LOG(ERROR) << "float feature type num error"; + LOG(ERROR) << "float feature type num error, edge_node: " << src_id << "," << dst_id; return false; } if (!bytes_reader.GetInt32List(float_feature_type_num, &float_features_idx_)) { - LOG(ERROR) << "float feature idx list error"; + LOG(ERROR) << "float feature idx list error, edge_node: " << src_id << "," << dst_id; return false; } int32_t float_fv_num = 0; @@ -90,19 +90,19 @@ bool FastEdge::DeSerialize(const char* s, size_t size) { float_features_idx_[i] = float_fv_num; } if (!bytes_reader.GetFloatList(float_fv_num, &float_features_)) { - LOG(ERROR) << "float feature value list error"; + LOG(ERROR) << "float feature value list error, edge_node: " << src_id << "," << dst_id; return false; } // parse binary feature int32_t binary_feature_type_num = 0; if (!bytes_reader.GetInt32(&binary_feature_type_num)) { - LOG(ERROR) << "binary feature type num error"; + LOG(ERROR) << "binary feature type num error, edge_node: " << src_id << "," << dst_id; return false; } if (!bytes_reader.GetInt32List(binary_feature_type_num, &binary_features_idx_)) { - LOG(ERROR) << "binary feature idx list error"; + LOG(ERROR) << "binary feature idx list error, edge_node: " << src_id << "," << dst_id; return false; } int32_t binary_fv_num = 0; @@ -111,7 +111,7 @@ bool FastEdge::DeSerialize(const char* s, size_t size) { binary_features_idx_[i] = binary_fv_num; } if (!bytes_reader.GetString(binary_fv_num, &binary_features_)) { - LOG(ERROR) << "binary feature value list error"; + LOG(ERROR) << "binary feature value list error, edge_node: " << src_id << "," << dst_id; return false; } diff --git a/euler/core/fast_node.cc b/euler/core/fast_node.cc index b111a45..914f33d 100644 --- a/euler/core/fast_node.cc +++ b/euler/core/fast_node.cc @@ -263,7 +263,7 @@ bool FastNode::DeSerialize(const char* s, size_t size) { int32_t edge_group_num = 0; if (!bytes_reader.GetInt32(&edge_group_num)) { - LOG(ERROR) << "edge group num error"; + LOG(ERROR) << "edge group num error, node_id: " << id_; return false; } @@ -275,25 +275,28 @@ bool FastNode::DeSerialize(const char* s, size_t size) { std::vector edge_group_size_list; if (!bytes_reader.GetInt32List(edge_group_num, &edge_group_size_list)) { - LOG(ERROR) << "edge group size list error"; + LOG(ERROR) << "edge group size list error, node_id: " << id_; return false; } std::vector edge_group_weight_list; if (!bytes_reader.GetFloatList(edge_group_num, &edge_group_weight_list)) { - LOG(ERROR) << "edge group weight list error"; + LOG(ERROR) << "edge group weight list error, node_id: " << id_; return false; } // build edge_group_collection_ - edge_group_collection_.Init(edge_group_ids, edge_group_weight_list); + if (!edge_group_collection_.Init(edge_group_ids, edge_group_weight_list)) { + LOG(ERROR) << "edge group weight collection error, node_id: " << id_; + return false; + } // build neighbor_collection_list_ std::vector> ids_list(edge_group_num); for (int32_t i = 0; i < edge_group_num; ++i) { ids_list[i] = std::vector(); if (!bytes_reader.GetUInt64List(edge_group_size_list[i], &ids_list[i])) { - LOG(ERROR) << "neighbor id list error"; + LOG(ERROR) << "neighbor id list error, node_id: " << id_; return false; } } @@ -302,7 +305,7 @@ bool FastNode::DeSerialize(const char* s, size_t size) { weights_list[i] = std::vector(); if (!bytes_reader.GetFloatList(edge_group_size_list[i], &weights_list[i])) { - LOG(ERROR) << "neighbor weight list error"; + LOG(ERROR) << "neighbor weight list error, node_id: " << id_; return false; } } @@ -330,40 +333,40 @@ bool FastNode::DeSerialize(const char* s, size_t size) { // parse uint64 feature int32_t uint64_feature_type_num = 0; if (!bytes_reader.GetInt32(&uint64_feature_type_num)) { - LOG(ERROR) << "uint64 feature type num error"; + LOG(ERROR) << "uint64 feature type num error, node_id: " << id_; return false; } std::vector uint64_feature_size_list; uint64_features_.resize(uint64_feature_type_num); if (!bytes_reader.GetInt32List(uint64_feature_type_num, &uint64_feature_size_list)) { - LOG(ERROR) << "uint64 feature idx list error"; + LOG(ERROR) << "uint64 feature idx list error, node_id: " << id_; return false; } for (int32_t i = 0; i < uint64_feature_type_num; ++i) { if (!bytes_reader.GetUInt64List(uint64_feature_size_list[i], &uint64_features_[i])) { - LOG(ERROR) << "uint64 feature value list error"; + LOG(ERROR) << "uint64 feature value list error, node_id: " << id_; return false; } } // parse float feature int32_t float_feature_type_num = 0; if (!bytes_reader.GetInt32(&float_feature_type_num)) { - LOG(ERROR) << "float feature type num error"; + LOG(ERROR) << "float feature type num error, node_id_: " << id_; return false; } std::vector float_feature_size_list; float_features_.resize(float_feature_type_num); if (!bytes_reader.GetInt32List(float_feature_type_num, &float_feature_size_list)) { - LOG(ERROR) << "float feature idx list error"; + LOG(ERROR) << "float feature idx list error, node_id_: " << id_; return false; } for (int32_t i = 0; i < float_feature_type_num; ++i) { if (!bytes_reader.GetFloatList(float_feature_size_list[i], &float_features_[i])) { - LOG(ERROR) << "float feature value list error"; + LOG(ERROR) << "float feature value list error, node_id: " << id_; return false; } } @@ -371,20 +374,20 @@ bool FastNode::DeSerialize(const char* s, size_t size) { // parse binary feature int32_t binary_feature_type_num = 0; if (!bytes_reader.GetInt32(&binary_feature_type_num)) { - LOG(ERROR) << "binary feature type num error"; + LOG(ERROR) << "binary feature type num error, node_id: " << id_; return false; } std::vector binary_feature_size_list; binary_features_.resize(binary_feature_type_num); if (!bytes_reader.GetInt32List(binary_feature_type_num, &binary_feature_size_list)) { - LOG(ERROR) << "binary feature idx list error"; + LOG(ERROR) << "binary feature idx list error, node_id: " << id_; return false; } for (int32_t i = 0; i < binary_feature_type_num; ++i) { if (!bytes_reader.GetString(binary_feature_size_list[i], &binary_features_[i])) { - LOG(ERROR) << "binary feature value list error"; + LOG(ERROR) << "binary feature value list error, node_id: " << id_; return false; } } diff --git a/euler/core/graph.h b/euler/core/graph.h index e05e095..0408dee 100644 --- a/euler/core/graph.h +++ b/euler/core/graph.h @@ -36,6 +36,8 @@ namespace core { class Graph { public: Graph() { + node_map_.reserve(10000000); + edge_map_.reserve(10000000); node_type_num_ = 0; edge_type_num_ = 0; global_sampler_ok_ = false; @@ -90,6 +92,22 @@ class Graph { return true; } + virtual bool AddNodeFrom(const std::unordered_map& map) { + node_map_insert_lock_.lock(); + node_map_.insert(map.begin(), map.end()); + node_map_insert_lock_.unlock(); + return true; + } + + virtual bool AddNodeFrom(const std::vector& vec) { + node_map_insert_lock_.lock(); + for(auto &it:vec){ + node_map_.insert({it->GetID(), it}); + } + node_map_insert_lock_.unlock(); + return true; + } + virtual bool AddEdge(Edge* e) { euler::common::EdgeID edge_id = e->GetID(); edge_map_insert_lock_.lock(); @@ -98,6 +116,38 @@ class Graph { return true; } + virtual bool AddEdgeFrom(const std::unordered_map& map) { + edge_map_insert_lock_.lock(); + edge_map_.insert(map.begin(), map.end()); + edge_map_insert_lock_.unlock(); + return true; + } + + virtual bool AddEdgeFrom(const std::vector& vec) { + edge_map_insert_lock_.lock(); + for(auto &it:vec){ + edge_map_.insert({it->GetID(),it}); + } + edge_map_insert_lock_.unlock(); + return true; + } + + int64_t getNodeSize() { + return node_map_.size(); + } + + int64_t getEdgeSize() { + return edge_map_.size(); + } + + void reserveNodeMap(size_t size) { + node_map_.reserve(size); + } + + void reserveEdgeMap(size_t size){ + edge_map_.reserve(size); + } virtual bool BuildGlobalSampler() = 0; virtual bool BuildGlobalEdgeSampler() = 0; diff --git a/euler/core/graph_builder.cc b/euler/core/graph_builder.cc index 1e538e4..4730aa0 100644 --- a/euler/core/graph_builder.cc +++ b/euler/core/graph_builder.cc @@ -17,6 +17,7 @@ limitations under the License. #include #include +#include #include #include @@ -26,14 +27,14 @@ limitations under the License. #include "euler/common/bytes_reader.h" -#define THREAD_NUM 100 - namespace euler { namespace core { bool GraphBuilder::LoadData(LoaderType loader_type, const std::vector& file_list, - Graph* graph, std::string addr, int32_t port) { + Graph* graph, std::string addr, int32_t port, + NODEVEC &np, EDGEVEC &ep, + int32_t& n_type_num, int32_t& e_type_num) { for (size_t i = 0; i < file_list.size(); ++i) { euler::common::FileIO* reader = nullptr; euler::common::FileIO::ConfigMap config; @@ -70,7 +71,7 @@ bool GraphBuilder::LoadData(LoaderType loader_type, } int32_t block_size = 0; while (reader->Read(&block_size)) { - if (!ParseBlock(reader, graph, block_size)) { + if (!ParseBlock(reader, graph, block_size, np, ep, n_type_num, e_type_num)) { LOG(ERROR) << file_list[i] << " data error!"; return false; } @@ -87,10 +88,17 @@ Graph* GraphBuilder::BuildGraph(const std::vector& file_names, LoaderType loader_type, std::string addr, int32_t port, GlobalSamplerType global_sampler_type) { + int THREAD_NUM = std::thread::hardware_concurrency(); Graph* graph = factory_->CreateGraph(); bool load_success = true; std::vector thread_list; int p_num = file_names.size() / THREAD_NUM + 1; + NODEVEC tmp_node_vec[THREAD_NUM]; + EDGEVEC tmp_edge_vec[THREAD_NUM]; + int32_t n_type_num[THREAD_NUM]; + int32_t e_type_num[THREAD_NUM]; + memset(n_type_num, 0, sizeof(n_type_num)); + memset(e_type_num, 0, sizeof(e_type_num)); for (int i = 0; i < THREAD_NUM; ++i) { std::vector file_list; int j = i * p_num; @@ -99,20 +107,45 @@ Graph* GraphBuilder::BuildGraph(const std::vector& file_names, file_list.push_back(file_names[j]); } - // LOG(INFO) << "Thread " << i << ", job size: " << file_list.size(); + //LOG(INFO) << "Thread " << i << ", job size: " << file_list.size(); thread_list.push_back(std::thread( - [this, loader_type, file_list, graph, addr, port] (bool* success) { - *success = *success && LoadData(loader_type, file_list, graph, addr, port); + [this, loader_type, file_list, graph, addr, port,i, &tmp_node_vec, &tmp_edge_vec, + &n_type_num, &e_type_num] (bool* success) { + *success = *success && LoadData(loader_type, file_list, graph, addr, port, tmp_node_vec[i], + tmp_edge_vec[i], n_type_num[i], e_type_num[i]); }, &load_success)); } - for (size_t i = 0; i < THREAD_NUM; ++i) { + for (int i = 0; i < THREAD_NUM; ++i) { thread_list[i].join(); } + int64_t node_size = 0, edge_size = 0; + for(int i = 0 ; i < THREAD_NUM ; i++) { + node_size += tmp_node_vec[i].size(); + edge_size += tmp_edge_vec[i].size(); + } + LOG(INFO) << "Each Thread Load Finish! Node Count:" << node_size<< " Edge Count:"<< edge_size; + + if (!load_success) { LOG(ERROR) << "Graph build failed!"; return nullptr; } + else { + LOG(INFO) << "Graph Loading Finish!"; + } + + for(int i = 0 ; i < THREAD_NUM ; i++) { + graph->AddNodeFrom(tmp_node_vec[i]); + tmp_node_vec[i].clear(); + graph->AddEdgeFrom(tmp_edge_vec[i]); + tmp_edge_vec[i].clear(); + graph->SetNodeTypeNum(n_type_num[i]); + graph->SetEdgeTypeNum(e_type_num[i]); + } + + LOG(INFO) << "Graph Load Finish! Node Count:" << graph->getNodeSize()<< " Edge Count:" + << graph->getEdgeSize(); if (global_sampler_type == node) { graph->BuildGlobalSampler(); @@ -131,7 +164,8 @@ Graph* GraphBuilder::BuildGraph(const std::vector& file_names, } bool GraphBuilder::ParseBlock(euler::common::FileIO* file_io, Graph* graph, - int32_t checksum) { + int32_t checksum, NODEVEC &np, EDGEVEC &ep, + int32_t& n_type_num, int32_t& e_type_num) { int32_t node_info_bytes = 0; std::string node_info; if (!file_io->Read(&node_info_bytes)) { @@ -147,8 +181,9 @@ bool GraphBuilder::ParseBlock(euler::common::FileIO* file_io, Graph* graph, return false; } - graph->AddNode(node); - graph->SetNodeTypeNum(node->GetType() + 1); + np.push_back(node); + int tmp = node->GetType() + 1; + n_type_num = tmp > n_type_num ? tmp: n_type_num; int32_t edges_num = 0; if (!file_io->Read(&edges_num)) { @@ -169,8 +204,9 @@ bool GraphBuilder::ParseBlock(euler::common::FileIO* file_io, Graph* graph, if (!edge->DeSerialize(edge_info)) { return false; } - graph->AddEdge(edge); - graph->SetEdgeTypeNum(edge->GetType() + 1); + ep.push_back(edge); + int tmp_e = edge->GetType() + 1; + e_type_num = tmp_e > e_type_num ? tmp_e: e_type_num; } int32_t total_edges_info_bytes = 0; for (size_t i = 0; i < edges_info_bytes.size(); ++i) { diff --git a/euler/core/graph_builder.h b/euler/core/graph_builder.h index b3583fc..fb43e05 100644 --- a/euler/core/graph_builder.h +++ b/euler/core/graph_builder.h @@ -34,6 +34,12 @@ namespace core { enum LoaderType {local, hdfs}; enum GlobalSamplerType {edge, node, all, none}; +typedef std::unordered_map NODEMAP; +typedef std::vector NODEVEC; + +typedef std::unordered_map EDGEMAP; +typedef std::vector EDGEVEC; class GraphBuilder { public: explicit GraphBuilder(GraphFactory* factory) : factory_(factory) {} @@ -46,12 +52,14 @@ class GraphBuilder { GlobalSamplerType global_sampler_type); private: - bool ParseBlock(euler::common::FileIO* file_io, Graph* graph, int32_t checksum); + bool ParseBlock(euler::common::FileIO* file_io, Graph* graph, int32_t checksum, + NODEVEC &np, EDGEVEC &ep, int32_t& n_type_num, int32_t& e_type_num); bool LoadData( LoaderType loader_type, const std::vector& file_list, - Graph* graph, std::string addr, int32_t port); + Graph* graph, std::string addr, int32_t port,NODEVEC &np, EDGEVEC &ep, + int32_t& n_type_num, int32_t& e_type_num); private: std::unique_ptr factory_; diff --git a/euler/service/CMakeLists.txt b/euler/service/CMakeLists.txt index fd6bba1..8de9a1c 100644 --- a/euler/service/CMakeLists.txt +++ b/euler/service/CMakeLists.txt @@ -4,7 +4,7 @@ add_library(service STATIC graph_service.cc call_data.cc) target_link_libraries(service core common proto) add_library(euler_service SHARED graph_service.cc call_data.cc python_api.cc) -target_link_libraries(euler_service core common proto) +target_link_libraries(euler_service core common proto jemalloc_STATIC_PIC) set(LIB_DST ${CMAKE_CURRENT_SOURCE_DIR}/../python) add_custom_command(TARGET euler_service POST_BUILD diff --git a/tf_euler/CMakeLists.txt b/tf_euler/CMakeLists.txt index daf2003..78b011d 100644 --- a/tf_euler/CMakeLists.txt +++ b/tf_euler/CMakeLists.txt @@ -50,7 +50,7 @@ add_library(tf_euler SHARED utils/create_graph.cc) -target_link_libraries(tf_euler client common ${TF_FRAMEWORK}) +target_link_libraries(tf_euler client common ${TF_FRAMEWORK} jemalloc_STATIC_PIC) target_include_directories(tf_euler BEFORE PRIVATE ${TF_INC_DIR} "${TF_INC_DIR}/external/nsync/public/") diff --git a/tf_euler/python/aggregators.py b/tf_euler/python/aggregators.py index 79604df..4b3abcf 100644 --- a/tf_euler/python/aggregators.py +++ b/tf_euler/python/aggregators.py @@ -58,7 +58,7 @@ def call(self, inputs): else: return tf.add(from_self, from_neighs) - def aggregate(self): + def aggregate(self, inputs): raise NotImplementedError() diff --git a/tf_euler/python/base_layers.py b/tf_euler/python/base_layers.py new file mode 100644 index 0000000..71d09bb --- /dev/null +++ b/tf_euler/python/base_layers.py @@ -0,0 +1,163 @@ +# Copyright 2018 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import collections + +import tensorflow as tf + +from tensorflow.python.util import nest + + +_LAYER_UIDS = collections.defaultdict(lambda: 0) + +def get_layer_uid(layer_name=''): + _LAYER_UIDS[layer_name] += 1 + return _LAYER_UIDS[layer_name] + + +class Layer(object): + """ + Layer class modeled after Keras (http://keras.io). + """ + + def __init__(self, name=None, **kwargs): + self.built = False + + if name is None: + layer_name = self.__class__.__name__.lower() + name = layer_name + '_' + str(get_layer_uid(layer_name)) + + self._name = name + + def build(self, input_shape): + self.built = True + + def call(self, inputs): + return inputs + + def __call__(self, inputs): + input_shapes = None + if all(hasattr(x, 'shape') for x in nest.flatten(inputs)): + input_shapes = nest.map_structure(lambda x: x.shape, inputs) + + with tf.variable_scope(self._name): + if not self.built: + self.build(input_shapes) + outputs = self.call(inputs) + return outputs + + def compute_output_shape(self, input_shape): + raise NotImplementedError() + + +class Dense(Layer): + """ + Basic full-connected layer. + """ + + def __init__(self, + dim, + activation=None, + use_bias=True, + kernel_initializer=lambda: tf.uniform_unit_scaling_initializer(factor=0.36), + bias_initializer=lambda: tf.constant_initializer(value=0.0002), + **kwargs): + super(Dense, self).__init__(**kwargs) + self.dim = dim + self.activation = activation + self.use_bias = use_bias + self.kernel_initializer = kernel_initializer + self.bias_initializer = bias_initializer + + def build(self, input_shape): + input_shape = tf.TensorShape(input_shape) + self.kernel = tf.get_variable( + 'kernel', + shape=[input_shape[-1].value, self.dim], + initializer=self.kernel_initializer()) + if self.use_bias: + self.bias = tf.get_variable( + 'bias', + shape=[self.dim], + initializer=self.bias_initializer()) + else: + self.bias = None + self.built = True + + def call(self, inputs): + rank = inputs.shape.ndims + if rank > 2: + outputs = tf.tensordot(inputs, self.kernel, [[rank - 1], [0]]) + else: + outputs = tf.matmul(inputs, self.kernel) + if self.use_bias: + outputs = tf.nn.bias_add(outputs, self.bias) + if self.activation: + outputs = self.activation(outputs) + return outputs + + +class Embedding(Layer): + """ + Id to dense vector embedding. + """ + + def __init__(self, + max_id, + dim, + initializer=lambda: tf.truncated_normal_initializer(stddev=0.1), + **kwargs): + super(Embedding, self).__init__(**kwargs) + self.max_id = max_id + self.dim = dim + self.initializer = initializer + + def build(self, input_shape): + self.embeddings = tf.get_variable( + 'embeddings', + shape=[self.max_id + 1, self.dim], + initializer=self.initializer()) + self.built = True + + def call(self, inputs): + shape = inputs.shape + inputs = tf.reshape(inputs,[-1]) + output_shape = shape.concatenate(self.dim) + output_shape = [d if d is not None else -1 for d in output_shape.as_list()] + return tf.reshape(tf.nn.embedding_lookup(self.embeddings, inputs),output_shape) + + +class SparseEmbedding(Embedding): + """ + Sparse id to dense vector embedding. + """ + def __init__( + self, + max_id, + dim, + initializer=lambda: tf.truncated_normal_initializer(stddev=0.0002), + combiner='sum', + **kwargs): + super(SparseEmbedding, self).__init__( + max_id=max_id, dim=dim, initializer=initializer, **kwargs) + self.combiner = combiner + + def call(self, inputs): + return tf.nn.embedding_lookup_sparse(self.embeddings, inputs, None, + combiner=self.combiner) diff --git a/tf_euler/python/encoders.py b/tf_euler/python/encoders.py index c339efc..90a47d9 100644 --- a/tf_euler/python/encoders.py +++ b/tf_euler/python/encoders.py @@ -32,31 +32,134 @@ class ShallowEncoder(layers.Layer): Basic encoder combining embedding of node id and dense feature. """ - def __init__(self, dim, feature_idx=-1, feature_dim=0, max_id=-1, - use_feature=True, use_id=False, **kwargs): + def __init__(self, dim=None, feature_idx=-1, feature_dim=0, max_id=-1, + sparse_feature_idx=-1, sparse_feature_max_id=-1, + embedding_dim=16, use_hash_embedding=False, combiner='concat', + **kwargs): super(ShallowEncoder, self).__init__(**kwargs) - if not use_feature and not use_id: - raise ValueError('Either use_feature or use_id must be True.') + + if combiner not in ['add', 'concat']: + raise ValueError('combiner must be \'add\' or \'concat\'.') + if combiner == 'add' and dim is None: + raise ValueError('add must be used with dim provided.') + + use_feature = feature_idx != -1 + use_id = max_id != -1 + use_sparse_feature = sparse_feature_idx != -1 + + if isinstance(feature_idx, int) and use_feature: + feature_idx = [feature_idx] + if isinstance(feature_dim, int) and use_feature: + feature_dim = [feature_dim] + if use_feature and len(feature_idx) != len(feature_dim): + raise ValueError('feature_dim must be the same length as feature_idx.') + + if isinstance(sparse_feature_idx, int) and use_sparse_feature: + sparse_feature_idx = [sparse_feature_idx] + if isinstance(sparse_feature_max_id, int) and use_sparse_feature: + sparse_feature_max_id = [sparse_feature_max_id] + if use_sparse_feature and \ + len(sparse_feature_idx) != len(sparse_feature_max_id): + raise ValueError('sparse_feature_idx must be the same length as' + 'sparse_feature_max_id.') + + embedding_num = (1 if use_id else 0) + \ + (len(sparse_feature_idx) if use_sparse_feature else 0) + + if combiner == 'add': + embedding_dim = dim + if isinstance(embedding_dim, int) and embedding_num: + embedding_dim = [embedding_dim] * embedding_num + if embedding_num and len(embedding_dim) != embedding_num: + raise ValueError('length of embedding_num must be int(use_id) + ' + 'len(sparse_feature_idx)') + + if isinstance(use_hash_embedding, bool) and embedding_num: + use_hash_embedding = [use_hash_embedding] * embedding_num + if embedding_num and len(use_hash_embedding) != embedding_num: + raise ValueError('length of use_hash_embedding must be int(use_id) + ' + 'len(sparse_feature_idx)') + + # model architechture self.dim = dim - self.use_id = use_feature + self.use_id = use_id self.use_feature = use_feature - if use_id: - self.embedding = layers.Embedding(dim, max_id) - if use_feature: - self.dense = layers.Dense(self.dim) + self.use_sparse_feature = use_sparse_feature + self.combiner = combiner + + # feature fetching parameters self.feature_idx = feature_idx self.feature_dim = feature_dim + self.sparse_feature_idx = sparse_feature_idx + self.sparse_feature_max_id = sparse_feature_max_id + self.embedding_dim = embedding_dim + # sub-layers + if dim: + self.dense = layers.Dense(self.dim, use_bias=False) + + if use_id: + embedding_class = \ + layers.HashEmbedding if use_hash_embedding[0] else layers.Embedding + self.embedding = embedding_class(max_id + 1, embedding_dim[0]) + embedding_dim = embedding_dim[1:] + use_hash_embedding = use_hash_embedding[1:] + if use_sparse_feature: + self.sparse_embeddings = [] + for max_id, dim, use_hash in zip( + sparse_feature_max_id, embedding_dim, use_hash_embedding): + sparse_embedding_class = \ + layers.HashSparseEmbedding if use_hash else layers.SparseEmbedding + self.sparse_embeddings.append( + sparse_embedding_class(max_id + 1, dim)) + + @property + def output_dim(self): + if self.dim is not None: + return self.dim + + output_dim = 0 + if self.use_feature: + output_dim += sum(self.feature_dim) + if self.use_id or self.use_sparse_feature: + output_dim += sum(self.embedding_dim) + return output_dim def call(self, inputs): + input_shape = inputs.shape + inputs = tf.reshape(inputs, [-1]) embeddings = [] + if self.use_id: embeddings.append(self.embedding(inputs)) + if self.use_feature: - feature = euler_ops.get_dense_feature( - inputs, [self.feature_idx], [self.feature_dim])[0] - embeddings.append(self.dense(feature)) - return tf.add_n(embeddings) + features = euler_ops.get_dense_feature( + inputs, self.feature_idx, self.feature_dim) + features = tf.concat(features, -1) + if self.combiner == 'add': + features = self.dense(features) + embeddings.append(features) + + if self.use_sparse_feature: + default_values = [max_id + 1 for max_id in self.sparse_feature_max_id] + sparse_features = euler_ops.get_sparse_feature( + inputs, self.sparse_feature_idx, default_values=default_values) + embeddings.extend([ + sparse_embedding(sparse_feature) + for sparse_embedding, sparse_feature + in zip(self.sparse_embeddings, sparse_features) + ]) + + if self.combiner == 'add': + embedding = tf.add_n(embeddings) + else: + embedding = tf.concat(embeddings, -1) + if self.dim: + embedding = self.dense(embedding) + output_shape = input_shape.concatenate(self.output_dim) + output_shape = [d if d is not None else -1 for d in output_shape.as_list()] + return tf.reshape(embedding, output_shape) class GCNEncoder(layers.Layer): @@ -65,21 +168,23 @@ class GCNEncoder(layers.Layer): """ def __init__(self, metapath, dim, aggregator='mean', - feature_idx=-1, feature_dim=0, max_id=-1, - use_feature=True, use_id=False, use_residual=False, **kwargs): + feature_idx=-1, feature_dim=0, max_id=-1, use_id=False, + sparse_feature_idx=-1, sparse_feature_max_id=-1, + embedding_dim=16, use_hash_embedding=False, use_residual=False, + **kwargs): super(GCNEncoder, self).__init__(**kwargs) self.metapath = metapath self.num_layers = len(metapath) - self.use_id = use_id - self.use_feature = use_feature self.use_residual = use_residual - if use_id: - self.id_layer = layers.Embedding(max_id, dim) - if use_feature and use_residual: - self.feature_layer = layers.Dense(dim, use_bias=False) - self.feature_idx = feature_idx - self.feature_dim = feature_dim + self._node_encoder = ShallowEncoder( + dim=dim if use_residual else None, + feature_idx=feature_idx, feature_dim=feature_dim, + max_id=max_id if use_id else -1, + sparse_feature_idx=sparse_feature_idx, + sparse_feature_max_id=sparse_feature_max_id, + embedding_dim=embedding_dim, use_hash_embedding=use_hash_embedding, + combiner='add' if use_residual else 'concat') self.aggregators = [] aggregator_class = sparse_aggregators.get(aggregator) @@ -88,22 +193,7 @@ def __init__(self, metapath, dim, aggregator='mean', self.aggregators.append(aggregator_class(dim, activation=activation)) def node_encoder(self, inputs): - embeddings = [] - - if self.use_id: - embeddings.append(self.id_layer(inputs)) - - if self.use_feature: - from_feature = euler_ops.get_dense_feature( - inputs, [self.feature_idx], [self.feature_dim])[0] - if self.use_residual: - from_feature = self.feature_layer(from_feature) - embeddings.append(from_feature) - - if self.use_residual: - return tf.add_n(embeddings) - else: - return tf.concat(embeddings, 1) + return self._node_encoder(inputs) def call(self, inputs): nodes, adjs = euler_ops.get_multi_hop_neighbor(inputs, self.metapath) @@ -127,13 +217,16 @@ def call(self, inputs): class ScalableGCNEncoder(GCNEncoder): def __init__(self, edge_type, num_layers, dim, aggregator='mean', - feature_idx=-1, feature_dim=0, max_id=-1, - use_feature=True, use_id=False, use_residual=False, + feature_idx=-1, feature_dim=0, max_id=-1, use_id=False, + sparse_feature_idx=-1, sparse_feature_max_id=-1, + embedding_dim=16, use_hash_embedding=False, use_residual=False, store_learning_rate=0.001, store_init_maxval=0.05, **kwargs): metapath = [edge_type] * num_layers super(ScalableGCNEncoder, self).__init__( - metapath, dim, aggregator, feature_idx, feature_dim, max_id, - use_feature, use_id, use_residual, **kwargs) + metapath, dim, aggregator, + feature_idx, feature_dim, max_id, use_id, + sparse_feature_idx, sparse_feature_max_id, + embedding_dim, use_hash_embedding, use_residual, **kwargs) self.dim = dim self.edge_type = edge_type self.max_id = max_id @@ -222,7 +315,8 @@ def _optimize_store(self, node, node_embeddings): embedding_gradient = tf.nn.embedding_lookup(gradient_store, node) with tf.control_dependencies([embedding_gradient]): clear_ops.append( - utils_embedding.embedding_update(gradient_store, node, 0)) + utils_embedding.embedding_update(gradient_store, node, + tf.zeros_like(embedding_gradient))) losses.append(tf.reduce_sum(node_embedding * embedding_gradient)) store_loss = tf.add_n(losses) @@ -249,45 +343,49 @@ def create_aggregators(dim, num_layers, aggregator, **kwargs): def __init__(self, metapath, fanouts, dim, aggregator='mean', concat=False, shared_aggregators=None, feature_idx=-1, feature_dim=0, max_id=-1, - use_feature=True, use_id=False, **kwargs): + use_feature=None, use_id=False, + sparse_feature_idx=-1, sparse_feature_max_id=-1, + embedding_dim=16, use_hash_embedding=False, + shared_node_encoder=None, use_residual=False, **kwargs): super(SageEncoder, self).__init__(**kwargs) if len(metapath) != len(fanouts): raise ValueError('Len of metapath must be the same as fanouts.') + if use_feature is not None or use_id is not None: + tf.logging.warning('use_feature is deprecated ' + 'and would not have any effect.') + self.metapath = metapath self.fanouts = fanouts self.num_layers = len(metapath) self.concat = concat - layer0_dim = (feature_dim if use_feature else 0) + (dim if use_id else 0) + if shared_node_encoder: + self._node_encoder = shared_node_encoder + else: + self._node_encoder = ShallowEncoder( + feature_idx=feature_idx, feature_dim=feature_dim, + max_id=max_id if use_id else -1, + sparse_feature_idx=sparse_feature_idx, + sparse_feature_max_id=sparse_feature_max_id, + embedding_dim=embedding_dim, use_hash_embedding=use_hash_embedding) + + layer0_dim = self._node_encoder.output_dim self.dims = [layer0_dim] + [dim] * self.num_layers - self.use_id = use_id - self.use_feature = use_feature - if use_id: - self.embedding = layers.Embedding(max_id, dim) - self.feature_idx = feature_idx - self.feature_dim = feature_dim - if shared_aggregators is not None: self.aggregators = shared_aggregators else: self.aggregators = self.create_aggregators( dim, self.num_layers, aggregator, concat=concat) - def node_encoder(self, inputs): - if self.use_id: - id_embedding = self.embedding(inputs) - if not self.use_feature: - return id_embedding + self._max_id = max_id - feature = euler_ops.get_dense_feature(inputs, [self.feature_idx], - [self.feature_dim])[0] - if self.use_id: - feature = tf.concat([feature, id_embedding], 1) - return feature + def node_encoder(self, inputs): + return self._node_encoder(inputs) def call(self, inputs): - samples = euler_ops.sample_fanout(inputs, self.metapath, self.fanouts, default_node=0)[0] + samples = euler_ops.sample_fanout( + inputs, self.metapath, self.fanouts, default_node=self._max_id + 1)[0] hidden = [self.node_encoder(sample) for sample in samples] for layer in range(self.num_layers): aggregator = self.aggregators[layer] @@ -312,12 +410,18 @@ def __init__(self, edge_type, fanout, num_layers, dim, aggregator='mean', concat=False, shared_aggregators=None, feature_idx=-1, feature_dim=0, max_id=-1, use_feature=True, use_id=False, + sparse_feature_idx=-1, sparse_feature_max_id=-1, + embedding_dim=16, use_hash_embedding=False, + shared_node_encoder=None, use_residual=False, store_learning_rate=0.001, store_init_maxval=0.05, **kwargs): metapath = [edge_type] * num_layers fanouts = [fanout] * num_layers super(ScalableSageEncoder, self).__init__( metapath, fanouts, dim, aggregator, concat, shared_aggregators, - feature_idx, feature_dim, max_id, use_feature, use_id, **kwargs) + feature_idx, feature_dim, max_id, use_feature, use_id, + sparse_feature_idx, sparse_feature_max_id, + embedding_dim, use_hash_embedding, shared_node_encoder, use_residual, + **kwargs) self.edge_type = edge_type self.fanout = fanout self.max_id = max_id @@ -330,13 +434,15 @@ def build(self, input_shape): [self.max_id + 2, dim], initializer=tf.random_uniform_initializer( maxval=self.store_init_maxval, seed=1), - trainable=False) + trainable=False, + collections=[tf.GraphKeys.LOCAL_VARIABLES]) for i, dim in enumerate(self.dims[1:-1], 1)] self.gradient_stores = [ tf.get_variable('gradient_store_layer_{}'.format(i), [self.max_id + 2, dim], initializer=tf.zeros_initializer(), - trainable=False) + trainable=False, + collections=[tf.GraphKeys.LOCAL_VARIABLES]) for i, dim in enumerate(self.dims[1:-1], 1)] self.store_optimizer = tf.train.AdamOptimizer(self.store_learning_rate) @@ -404,7 +510,8 @@ def _optimize_store(self, node, node_embeddings): embedding_gradient = tf.nn.embedding_lookup(gradient_store, node) with tf.control_dependencies([embedding_gradient]): clear_ops.append( - utils_embedding.embedding_update(gradient_store, node, 0)) + utils_embedding.embedding_update(gradient_store, node, + tf.zeros_like(embedding_gradient))) losses.append(tf.reduce_sum(node_embedding * embedding_gradient)) store_loss = tf.add_n(losses) diff --git a/tf_euler/python/euler_ops/feature_ops.py b/tf_euler/python/euler_ops/feature_ops.py index 8715581..d6fb1eb 100644 --- a/tf_euler/python/euler_ops/feature_ops.py +++ b/tf_euler/python/euler_ops/feature_ops.py @@ -47,7 +47,7 @@ def _get_sparse_feature(nodes_or_edges, feature_ids, op, thread_num, return [tf.sparse_concat(axis=0, sp_inputs=sp, expand_nonconcat_dim=True) for sp in split_sp_transpose] -def get_sparse_feature(nodes, feature_ids, thread_num=1, default_values=None): +def get_sparse_feature(nodes, feature_ids, default_values=None, thread_num=1): """ Fetch sparse features of nodes. @@ -65,7 +65,7 @@ def get_sparse_feature(nodes, feature_ids, thread_num=1, default_values=None): base._LIB_OP.get_sparse_feature, thread_num) -def get_edge_sparse_feature(edges, feature_ids, thread_num=1, default_values=None): +def get_edge_sparse_feature(edges, feature_ids, default_values=None, thread_num=1): """ Args: edges: A 2-D `Tensor` of `int64`, with shape `[num_edges, 3]`. diff --git a/tf_euler/python/euler_ops/feature_ops_test.py b/tf_euler/python/euler_ops/feature_ops_test.py index e9b2e97..f50aff0 100644 --- a/tf_euler/python/euler_ops/feature_ops_test.py +++ b/tf_euler/python/euler_ops/feature_ops_test.py @@ -59,7 +59,7 @@ def setUpClass(cls): def testGetNodeSparseFeature(self): """Test get sparse feature for nodes""" - op = ops.get_sparse_feature(tf.constant([1, 2, 3, 4], dtype=tf.int64), [0, 1], 2) + op = ops.get_sparse_feature(tf.constant([1, 2, 3, 4], dtype=tf.int64), [0, 1], None, 2) with tf.Session() as sess: sparse_features = sess.run(op) features = [ diff --git a/tf_euler/python/euler_ops/neighbor_ops.py b/tf_euler/python/euler_ops/neighbor_ops.py index 039f40e..2f623c5 100644 --- a/tf_euler/python/euler_ops/neighbor_ops.py +++ b/tf_euler/python/euler_ops/neighbor_ops.py @@ -120,9 +120,10 @@ def get_multi_hop_neighbor(nodes, edge_types): next_nodes, next_idx = tf.unique(neighbor.values, out_idx=tf.int64) next_indices = tf.stack([neighbor.indices[:, 0], next_idx], 1) next_values = weight.values - next_shape = [tf.size(nodes), tf.size(next_nodes)] - next_adj = tf.sparse.SparseTensor(next_indices, next_values, next_shape) - next_adj = tf.sparse.reorder(next_adj) + next_shape = tf.stack([tf.size(nodes), tf.size(next_nodes)]) + next_shape = tf.cast(next_shape, tf.int64) + next_adj = tf.SparseTensor(next_indices, next_values, next_shape) + next_adj = tf.sparse_reorder(next_adj) nodes_list.append(next_nodes) adj_list.append(next_adj) nodes = next_nodes diff --git a/tf_euler/python/layers.py b/tf_euler/python/layers.py index 229a09c..8954c47 100644 --- a/tf_euler/python/layers.py +++ b/tf_euler/python/layers.py @@ -17,146 +17,8 @@ from __future__ import division from __future__ import print_function -import collections - -import tensorflow as tf - -from tensorflow.python.util import nest - -_LAYER_UIDS = collections.defaultdict(lambda: 0) - - -def get_layer_uid(layer_name=''): - _LAYER_UIDS[layer_name] += 1 - return _LAYER_UIDS[layer_name] - - -class Layer(object): - """ - Layer class modeled after Keras (http://keras.io). - """ - - def __init__(self, name=None, **kwargs): - self.built = False - - if name is None: - layer_name = self.__class__.__name__.lower() - name = layer_name + '_' + str(get_layer_uid(layer_name)) - - self._name = name - - def build(self, input_shape): - self.built = True - - def call(self, inputs): - return inputs - - def __call__(self, inputs): - input_shapes = None - if all(hasattr(x, 'shape') for x in nest.flatten(inputs)): - input_shapes = nest.map_structure(lambda x: x.shape, inputs) - - with tf.variable_scope(self._name): - if not self.built: - self.build(input_shapes) - outputs = self.call(inputs) - return outputs - - def compute_output_shape(self, input_shape): - raise NotImplementedError() - -class Dense(Layer): - """ - Basic full-connected layer. - """ - - def __init__(self, - dim, - activation=None, - use_bias=True, - kernel_initializer=lambda: tf.uniform_unit_scaling_initializer(factor=0.36), - bias_initializer=lambda: tf.constant_initializer(value=0.0002), - **kwargs): - super(Dense, self).__init__(**kwargs) - self.dim = dim - self.activation = activation - self.use_bias = use_bias - self.kernel_initializer = kernel_initializer - self.bias_initializer = bias_initializer - - def build(self, input_shape): - input_shape = tf.TensorShape(input_shape) - self.kernel = tf.get_variable( - 'kernel', - shape=[input_shape[-1].value, self.dim], - initializer=self.kernel_initializer()) - if self.use_bias: - self.bias = tf.get_variable( - 'bias', - shape=[self.dim], - initializer=self.bias_initializer()) - else: - self.bias = None - self.built = True - - def call(self, inputs): - rank = inputs.shape.ndims - if rank > 2: - outputs = tf.tensordot(inputs, self.kernel, [[rank - 1], [0]]) - else: - outputs = tf.matmul(inputs, self.kernel) - if self.use_bias: - outputs = tf.nn.bias_add(outputs, self.bias) - if self.activation: - outputs = self.activation(outputs) - return outputs - - -class Embedding(Layer): - """ - Id to dense vector embedding. - """ - - def __init__(self, - max_id, - dim, - initializer=lambda: tf.truncated_normal_initializer(stddev=0.1), - **kwargs): - super(Embedding, self).__init__(**kwargs) - self.max_id = max_id - self.dim = dim - self.initializer = initializer - - def build(self, input_shape): - self.embeddings = tf.get_variable( - 'embeddings', - shape=[self.max_id + 1, self.dim], - initializer=self.initializer()) - self.built = True - - def call(self, inputs): - shape = inputs.shape - inputs = tf.reshape(inputs,[-1]) - output_shape = shape.concatenate(self.dim) - output_shape = [d if d is not None else -1 for d in output_shape.as_list()] - return tf.reshape(tf.nn.embedding_lookup(self.embeddings, inputs),output_shape) - - -class SparseEmbedding(Embedding): - """ - Sparse id to dense vector embedding. - """ - def __init__( - self, - max_id, - dim, - initializer=lambda: tf.truncated_normal_initializer(stddev=0.0002), - combiner='sum', - **kwargs): - super(SparseEmbedding, self).__init__( - max_id=max_id, dim=dim, initializer=initializer, **kwargs) - self.combiner = combiner - - def call(self, inputs): - return tf.nn.embedding_lookup_sparse(self.embeddings, inputs, None, - combiner=self.combiner) +from tf_euler.python.base_layers import Layer, Dense, Embedding, SparseEmbedding +try: + from tf_euler.python.ps_layers import HashEmbedding, HashSparseEmbedding +except ImportError: + pass diff --git a/tf_euler/python/models/__init__.py b/tf_euler/python/models/__init__.py index 981d1ba..4beec4c 100644 --- a/tf_euler/python/models/__init__.py +++ b/tf_euler/python/models/__init__.py @@ -17,11 +17,11 @@ from __future__ import division from __future__ import print_function -from tf_euler.python.models.base import SupervisedModel +from tf_euler.python.models.base import SupervisedModel, ModelOutput from tf_euler.python.models.lasgnn import LasGNN from tf_euler.python.models.line import LINE from tf_euler.python.models.node2vec import Node2Vec -from tf_euler.python.models.gcn import GCN, ScalableGCN +from tf_euler.python.models.gcn import SupervisedGCN, ScalableGCN from tf_euler.python.models.graphsage import GraphSage, SupervisedGraphSage, \ ScalableSage from tf_euler.python.models.gat import GAT diff --git a/tf_euler/python/models/base.py b/tf_euler/python/models/base.py index e629c3f..58b4bd2 100644 --- a/tf_euler/python/models/base.py +++ b/tf_euler/python/models/base.py @@ -90,8 +90,8 @@ def decoder(self, embedding, embedding_pos, embedding_negs): labels=tf.zeros_like(neg_logits), logits=neg_logits) loss = tf.reduce_sum(true_xent) + tf.reduce_sum(negative_xent) else: - neg_cost = tf.reduce_logsumexp(neg_logits, axis=2) - loss = tf.reduce_sum(logits - neg_cost) + neg_cost = tf.reduce_logsumexp(neg_logits, axis=2, keepdims=True) + loss = -tf.reduce_sum(logits - neg_cost) return loss, mrr def call(self, inputs): @@ -160,9 +160,8 @@ def decoder(self, embedding, embedding_pos, embedding_negs): labels=tf.zeros_like(neg_logits), logits=neg_logits) loss = tf.reduce_sum(true_xent) + tf.reduce_sum(negative_xent) else: - neg_cost = tf.reduce_logsumexp(neg_logits, axis=2) - loss = tf.reduce_sum(logits - neg_cost) - loss = loss / tf.to_float(tf.size(logits)) + neg_cost = tf.reduce_logsumexp(neg_logits, axis=2, keepdims=True) + loss = -tf.reduce_sum(logits - neg_cost) return loss, mrr def call(self, inputs): diff --git a/tf_euler/python/models/gcn.py b/tf_euler/python/models/gcn.py index a7cf5d8..525bae5 100644 --- a/tf_euler/python/models/gcn.py +++ b/tf_euler/python/models/gcn.py @@ -23,13 +23,21 @@ from tf_euler.python.models import base -class GCN(base.SupervisedModel): +class SupervisedGCN(base.SupervisedModel): + def __init__(self, label_idx, label_dim, metapath, dim, aggregator='mean', - feature_idx=-1, feature_dim=0, use_residual=False, + feature_idx=-1, feature_dim=0, max_id=-1, use_id=False, + sparse_feature_idx=-1, sparse_feature_max_id=-1, + embedding_dim=16, use_hash_embedding=False, use_residual=False, *args, **kwargs): - super(GCN, self).__init__(label_idx, label_dim, *args, **kwargs) + super(SupervisedGCN, self).__init__(label_idx, label_dim, *args, **kwargs) self._encoder = encoders.GCNEncoder( - metapath, dim, aggregator, feature_idx, feature_dim, + metapath, dim, aggregator, + feature_idx=feature_idx, feature_dim=feature_dim, + max_id=max_id, use_id=use_id, + sparse_feature_idx=sparse_feature_idx, + sparse_feature_max_id=sparse_feature_max_id, + embedding_dim=embedding_dim, use_hash_embedding=use_hash_embedding, use_residual=use_residual) def encoder(self, inputs): @@ -37,15 +45,22 @@ def encoder(self, inputs): class ScalableGCN(base.SupervisedModel): + def __init__(self, label_idx, label_dim, edge_type, num_layers, dim, - aggregator='mean', feature_idx=-1, feature_dim=0, max_id=-1, - use_residual=False, + aggregator='mean', feature_idx=-1, feature_dim=0, + max_id=-1, use_id=False, + sparse_feature_idx=-1, sparse_feature_max_id=-1, + embedding_dim=16, use_hash_embedding=False, use_residual=False, store_learning_rate=0.001, store_init_maxval=0.05, *args, **kwargs): super(ScalableGCN, self).__init__(label_idx, label_dim, *args, **kwargs) self._encoder = encoders.ScalableGCNEncoder( - edge_type, num_layers, dim, - aggregator, feature_idx, feature_dim, max_id, + edge_type, num_layers, dim, aggregator, + feature_idx=feature_idx, feature_dim=feature_dim, + max_id=max_id, use_id=use_id, + sparse_feature_idx=sparse_feature_idx, + sparse_feature_max_id=sparse_feature_max_id, + embedding_dim=embedding_dim, use_hash_embedding=use_hash_embedding, use_residual=use_residual, store_learning_rate=store_learning_rate, store_init_maxval=store_init_maxval) @@ -63,6 +78,7 @@ def make_session_run_hook(self): class _ScalableGCNHook(tf.train.SessionRunHook): + def __init__(self, scalable_sage_encoder, loss): self._scalable_gcn_encoder = scalable_sage_encoder self._loss = loss diff --git a/tf_euler/python/models/graphsage.py b/tf_euler/python/models/graphsage.py index 6574e22..e22c35e 100644 --- a/tf_euler/python/models/graphsage.py +++ b/tf_euler/python/models/graphsage.py @@ -24,35 +24,30 @@ class GraphSage(base.UnsupervisedModel): - def __init__(self, - node_type, - edge_type, - max_id, - metapath, - fanouts, - dim, - aggregator='mean', - concat=False, - feature_idx=-1, - feature_dim=0, - use_feature=True, - use_id=False, - *args, - **kwargs): - super(GraphSage, self).__init__(node_type, edge_type, max_id, *args, - **kwargs) + def __init__(self, node_type, edge_type, max_id, + metapath, fanouts, dim, aggregator='mean', concat=False, + feature_idx=-1, feature_dim=0, use_feature=None, use_id=False, + sparse_feature_idx=-1, sparse_feature_max_id=-1, + embedding_dim=16, use_hash_embedding=False, use_residual=False, + *args, **kwargs): + super(GraphSage, self).__init__( + node_type, edge_type, max_id, *args, **kwargs) self._target_encoder = encoders.SageEncoder( - metapath, - fanouts, - dim=dim, - aggregator=aggregator, - concat=concat, - feature_idx=feature_idx, - feature_dim=feature_dim,use_feature=use_feature,max_id=max_id+1,use_id=use_id) + metapath, fanouts, dim, aggregator, concat, + feature_idx=feature_idx, feature_dim=feature_dim, + max_id=max_id, use_id=use_id, + sparse_feature_idx=sparse_feature_idx, + sparse_feature_max_id=sparse_feature_max_id, + embedding_dim=embedding_dim, use_hash_embedding=use_hash_embedding, + use_residual=use_residual) self._context_encoder = encoders.SageEncoder( - metapath, fanouts, dim=dim, aggregator=aggregator, concat=concat, - feature_idx=feature_idx, feature_dim=feature_dim,max_id=max_id+1, use_feature=use_feature, - use_id=use_id) + metapath, fanouts, dim, aggregator, concat, + feature_idx=feature_idx, feature_dim=feature_dim, + max_id=max_id, use_id=use_id, + sparse_feature_idx=sparse_feature_idx, + sparse_feature_max_id=sparse_feature_max_id, + embedding_dim=embedding_dim, use_hash_embedding=use_hash_embedding, + use_residual=use_residual) def target_encoder(self, inputs): return self._target_encoder(inputs) @@ -62,43 +57,45 @@ def context_encoder(self, inputs): class SupervisedGraphSage(base.SupervisedModel): - def __init__(self, - label_idx, - label_dim, - metapath, - fanouts, - dim, - aggregator='mean', - concat=False, - feature_idx=-1, - feature_dim=0, - *args, - **kwargs): - super(SupervisedGraphSage, self).__init__(label_idx, label_dim, *args, - **kwargs) + def __init__(self, label_idx, label_dim, + metapath, fanouts, dim, aggregator='mean', concat=False, + feature_idx=-1, feature_dim=0, max_id=-1, use_id=False, + sparse_feature_idx=-1, sparse_feature_max_id=-1, + embedding_dim=16, use_hash_embedding=False, use_residual=False, + *args, **kwargs): + super(SupervisedGraphSage, self).__init__( + label_idx, label_dim, *args, **kwargs) self._encoder = encoders.SageEncoder( - metapath, - fanouts, - dim=dim, - aggregator=aggregator, - concat=concat, - feature_idx=feature_idx, - feature_dim=feature_dim) + metapath, fanouts, dim, aggregator, concat, + feature_idx=feature_idx, feature_dim=feature_dim, + max_id=max_id, use_id=use_id, + sparse_feature_idx=sparse_feature_idx, + sparse_feature_max_id=sparse_feature_max_id, + embedding_dim=embedding_dim, use_hash_embedding=use_hash_embedding, + use_residual=use_residual) def encoder(self, inputs): return self._encoder(inputs) class ScalableSage(base.SupervisedModel): - def __init__(self, label_idx, label_dim, edge_type, fanout, num_layers, dim, + def __init__(self, label_idx, label_dim, + edge_type, fanout, num_layers, dim, aggregator='mean', concat=False, - feature_idx=-1, feature_dim=0, max_id=-1, + feature_idx=-1, feature_dim=0, max_id=-1, use_id=False, + sparse_feature_idx=-1, sparse_feature_max_id=-1, + embedding_dim=16, use_hash_embedding=False, use_residual=False, store_learning_rate=0.001, store_init_maxval=0.05, *args, **kwargs): super(ScalableSage, self).__init__(label_idx, label_dim, *args, **kwargs) self._encoder = encoders.ScalableSageEncoder( edge_type, fanout, num_layers, dim, aggregator, concat, - feature_idx=feature_idx, feature_dim=feature_dim, max_id=max_id, + feature_idx=feature_idx, feature_dim=feature_dim, + max_id=max_id, use_id=use_id, + sparse_feature_idx=sparse_feature_idx, + sparse_feature_max_id=sparse_feature_max_id, + embedding_dim=embedding_dim, use_hash_embedding=use_hash_embedding, + use_residual=use_residual, store_learning_rate=store_learning_rate, store_init_maxval=store_init_maxval) diff --git a/tf_euler/python/models/lasgnn.py b/tf_euler/python/models/lasgnn.py index aa78a10..6f7e45d 100644 --- a/tf_euler/python/models/lasgnn.py +++ b/tf_euler/python/models/lasgnn.py @@ -110,7 +110,7 @@ def __init__(self, shared_aggregators=shared_aggregators) for metapath in metapaths_of_group ] for metapaths_of_group in metapaths_of_groups] - self._attention_of_group = [Attention() for _ in metapaths_of_group] + self._attention_of_group = [Attention() for _ in metapaths_of_groups] self._target_feed_forward = layers.Dense(dim) self._context_feed_forward = layers.Dense(dim) diff --git a/tf_euler/python/models/line.py b/tf_euler/python/models/line.py index d8e1706..f1370c0 100644 --- a/tf_euler/python/models/line.py +++ b/tf_euler/python/models/line.py @@ -19,6 +19,7 @@ import tensorflow as tf +from tf_euler.python import encoders from tf_euler.python import euler_ops from tf_euler.python import layers from tf_euler.python.models import base @@ -29,28 +30,41 @@ class LINE(base.UnsupervisedModel): Implementation of LINE model. """ - def __init__(self, - node_type, - edge_type, - max_id, - dim, - order=1, - *args, - **kwargs): + def __init__(self, node_type, edge_type, max_id, dim, order=1, + feature_idx=-1, feature_dim=0, use_id=True, + sparse_feature_idx=-1, sparse_feature_max_id=-1, + embedding_dim=16, use_hash_embedding=False, combiner='add', + *args, **kwargs): super(LINE, self).__init__(node_type, edge_type, max_id, *args, **kwargs) - self.target_embedding = layers.Embedding( - name='target_embedding', max_id=max_id + 1, dim=dim) if order == 1: - self.context_embedding = self.target_embedding - elif order == 2: - self.context_embedding = layers.Embedding( - name='context_embedding', max_id=max_id + 1, dim=dim) + order = 'first' + if order == 2: + order = 'second' + + self._target_encoder = encoders.ShallowEncoder( + dim=dim, feature_idx=feature_idx, feature_dim=feature_dim, + max_id=max_id if use_id else -1, + sparse_feature_idx=sparse_feature_idx, + sparse_feature_max_id=sparse_feature_max_id, + embedding_dim=embedding_dim, use_hash_embedding=use_hash_embedding, + combiner=combiner) + if order == 'first': + self._context_encoder = self._target_encoder + elif order == 'second': + self._context_encoder = encoders.ShallowEncoder( + dim=dim, feature_idx=feature_idx, feature_dim=feature_dim, + max_id=max_id if use_id else -1, + sparse_feature_idx=sparse_feature_idx, + sparse_feature_max_id=sparse_feature_max_id, + embedding_dim=embedding_dim, use_hash_embedding=use_hash_embedding, + combiner=combiner) else: - raise ValueError('LINE order must be 1 or 2, got {}:'.format(order)) + raise ValueError('LINE order must be one of 1, 2, "first", or "second"' + 'got {}:'.format(order)) def target_encoder(self, inputs): - return self.target_embedding(inputs) + return self._target_encoder(inputs) def context_encoder(self, inputs): - return self.context_embedding(inputs) + return self._context_encoder(inputs) diff --git a/tf_euler/python/models/lshne.py b/tf_euler/python/models/lshne.py index 8281567..34a3ea3 100644 --- a/tf_euler/python/models/lshne.py +++ b/tf_euler/python/models/lshne.py @@ -31,6 +31,7 @@ class LsHNE(base.UnsupervisedModel): def __init__(self, node_type, path_patterns, max_id, dim, sparse_feature_dims, feature_ids,feature_embedding_dim=16, walk_len=3, left_win_size=1, right_win_size=1, num_negs=5, gamma=5, + src_type_num=20, *args, **kwargs): super(LsHNE, self).__init__(node_type, path_patterns, max_id, *args, **kwargs) @@ -43,10 +44,11 @@ def __init__(self, node_type, path_patterns, max_id, dim, self.right_win_size = right_win_size self.num_negs = num_negs self.view_num = len(path_patterns) + self.src_type_num = src_type_num if self.view_num<1: - raise ValueError('View Number must be bigger than 1, got{}'.format(self.view_num)) + raise ValueError('View Number must be bigger than or equal 1, got{}'.format(self.view_num)) if not isinstance(sparse_feature_dims, list): - raise TypeError('Expect list for sparse feature dimsgot {}.'.format( + raise TypeError('Expect list for sparse feature dims, got {}.'.format( type(sparse_feature_dims).__name__)) self.sparse_feature_dims = sparse_feature_dims self.feature_ids = feature_ids @@ -57,14 +59,22 @@ def __init__(self, node_type, path_patterns, max_id, dim, self.feature_embedding_layer.append(layers.SparseEmbedding(d,feature_embedding_dim, combiner="sum")) - self.hidden_layer =[{}] * self.view_num + self.hidden_layer =[{} for i in range(self.view_num)] for i in range(0, self.view_num): - self.hidden_layer[i]['src'] = layers.Dense(256) - self.hidden_layer[i]['tar'] = layers.Dense(256) - self.out_layer = [{}] * self.view_num + self.hidden_layer[i]['src'] = [] + self.hidden_layer[i]['tar'] = [] + for j in range(0,self.src_type_num): + self.hidden_layer[i]['src'].append(layers.Dense(256)) + if i == 0: + self.hidden_layer[i]['tar'].append(layers.Dense(256)) + self.out_layer = [{} for i in range(self.view_num)] for i in range(0, self.view_num): - self.out_layer[i]['src'] = layers.Dense(self.dim) - self.out_layer[i]['tar'] = layers.Dense(self.dim) + self.out_layer[i]['src'] = [] + self.out_layer[i]['tar'] = [] + for j in range(0,self.src_type_num): + self.out_layer[i]['src'].append(layers.Dense(self.dim)) + if i == 0: + self.out_layer[i]['tar'].append(layers.Dense(self.dim)) self.att_vec = tf.get_variable( 'att_vec', @@ -102,17 +112,29 @@ def to_sample(self, inputs, view): def source_encoder(self, inputs, view): raw_emb = self.feature_embedding_lookup(inputs) - embedding = self.id_dnn_net(raw_emb, 'src', view) + node_type = euler_ops.get_node_type(inputs) + embedding = self.id_dnn_net(raw_emb, 'src', view, node_type) return embedding def context_encoder(self, inputs, view): raw_emb = self.feature_embedding_lookup(inputs) - embedding = self.id_dnn_net(raw_emb, 'tar', view) + node_type = euler_ops.get_node_type(inputs) + embedding = self.id_dnn_net(raw_emb, 'tar', view, node_type) return embedding - def id_dnn_net(self, inputs, name, view): - hidden = self.hidden_layer[view][name](inputs) - out = self.out_layer[view][name](hidden) + def id_dnn_net(self, inputs, name, view, node_type=None): + if name == 'tar': + view = 0 + node_type_vec = tf.reshape(tf.one_hot(node_type, self.src_type_num) + ,[-1, self.src_type_num, 1]) + hiddens = [tf.expand_dims(layer(inputs), 1) + for layer in self.hidden_layer[view][name]] + hiddens = [tf.reshape(hidden, [-1, 256]) + for hidden in hiddens] + out = tf.concat([tf.expand_dims(self.out_layer[view][name][i](hiddens[i]), 1) + for i in range(self.src_type_num)], 1) + out = tf.matmul(out, node_type_vec, transpose_a=True) + out = tf.reshape(out,[-1, self.dim]) return out def decoder(self, embedding, embedding_pos, embedding_negs): @@ -170,6 +192,8 @@ def call(self, inputs): loss = tf.reduce_sum(single_view_loss)+tf.reduce_sum(multi_view_loss) mrr = tf.reduce_mean(mrr_view) + embedding = self.get_att_embedding(None, inputs, -1) + embedding.set_shape([None, self.dim]) return ModelOutput( embedding=embedding, loss=loss, metric_name='mrr', metric=mrr) diff --git a/tf_euler/python/models/node2vec.py b/tf_euler/python/models/node2vec.py index 32c49ae..3769b7a 100644 --- a/tf_euler/python/models/node2vec.py +++ b/tf_euler/python/models/node2vec.py @@ -19,6 +19,7 @@ import tensorflow as tf +from tf_euler.python import encoders from tf_euler.python import euler_ops from tf_euler.python import layers from tf_euler.python.models import base @@ -28,21 +29,15 @@ class Node2Vec(base.UnsupervisedModel): """ """ - def __init__(self, - node_type, - edge_type, - max_id, - dim, - walk_len=3, - walk_p=1, - walk_q=1, - left_win_size=1, - right_win_size=1, - num_negs=5, - *args, - **kwargs): - super(Node2Vec, self).__init__(node_type, edge_type, max_id, *args, - **kwargs) + def __init__(self, node_type, edge_type, max_id, + dim, walk_len=3, walk_p=1, walk_q=1, + left_win_size=1, right_win_size=1, num_negs=5, + feature_idx=-1, feature_dim=0, use_id=True, + sparse_feature_idx=-1, sparse_feature_max_id=-1, + embedding_dim=16, use_hash_embedding=False, combiner='add', + *args, **kwargs): + super(Node2Vec, self).__init__( + node_type, edge_type, max_id, *args, **kwargs) self.node_type = node_type self.edge_type = edge_type self.max_id = max_id @@ -55,13 +50,23 @@ def __init__(self, self.num_negs = num_negs self.batch_size_ratio = \ - euler_ops.gen_pair(tf.zeros([0, walk_len + 1], dtype=tf.int64), - left_win_size, right_win_size).shape[1] + int(euler_ops.gen_pair(tf.zeros([0, walk_len + 1], dtype=tf.int64), + left_win_size, right_win_size).shape[1]) - self.target_embedding = layers.Embedding( - name='target_embedding', max_id=max_id + 1, dim=dim) - self.context_embedding = layers.Embedding( - name='context_embedding', max_id=max_id + 1, dim=dim) + self._target_encoder = encoders.ShallowEncoder( + dim=dim, feature_idx=feature_idx, feature_dim=feature_dim, + max_id=max_id if use_id else -1, + sparse_feature_idx=sparse_feature_idx, + sparse_feature_max_id=sparse_feature_max_id, + embedding_dim=embedding_dim, use_hash_embedding=use_hash_embedding, + combiner=combiner) + self._context_encoder = encoders.ShallowEncoder( + dim=dim, feature_idx=feature_idx, feature_dim=feature_dim, + max_id=max_id if use_id else -1, + sparse_feature_idx=sparse_feature_idx, + sparse_feature_max_id=sparse_feature_max_id, + embedding_dim=embedding_dim, use_hash_embedding=use_hash_embedding, + combiner=combiner) def to_sample(self, inputs): batch_size = tf.size(inputs) @@ -72,7 +77,6 @@ def to_sample(self, inputs): default_node=self.max_id + 1) pair = euler_ops.gen_pair(path, self.left_win_size, self.right_win_size) num_pairs = pair.shape[1] - print(num_pairs) src, pos = tf.split(pair, [1, 1], axis=-1) src = tf.reshape(src, [batch_size * num_pairs, 1]) pos = tf.reshape(pos, [batch_size * num_pairs, 1]) @@ -82,7 +86,7 @@ def to_sample(self, inputs): return src, pos, negs def target_encoder(self, inputs): - return self.target_embedding(inputs) + return self._target_encoder(inputs) def context_encoder(self, inputs): - return self.context_embedding(inputs) + return self._context_encoder(inputs) diff --git a/tf_euler/python/optimizers.py b/tf_euler/python/optimizers.py index 4c5de9a..43df12b 100644 --- a/tf_euler/python/optimizers.py +++ b/tf_euler/python/optimizers.py @@ -20,7 +20,9 @@ import tensorflow as tf optimizers = { - 'sgd': tf.train.GradientDescentOptimizer, + 'sgd': lambda lr: tf.train.MomentumOptimizer(lr, 0.0), + 'momentum': lambda lr: tf.train.MomentumOptimizer(lr, 0.9), + 'adagrad': tf.train.AdagradOptimizer, 'adam': tf.train.AdamOptimizer } diff --git a/tf_euler/python/run_loop.py b/tf_euler/python/run_loop.py index 265e3a8..4cb63d1 100644 --- a/tf_euler/python/run_loop.py +++ b/tf_euler/python/run_loop.py @@ -42,7 +42,7 @@ def define_network_embedding_flags(): tf.flags.DEFINE_integer('all_node_type', euler_ops.ALL_NODE_TYPE, 'Node type of the whole graph.') tf.flags.DEFINE_list('train_edge_type', [0], 'Edge type of training set.') - tf.flags.DEFINE_list('all_edge_type', [0, 1], + tf.flags.DEFINE_list('all_edge_type', [0, 1, 2], 'Edge type of the whole graph.') tf.flags.DEFINE_integer('max_id', -1, 'Max node id.') tf.flags.DEFINE_integer('feature_idx', -1, 'Feature index.') @@ -58,11 +58,11 @@ def define_network_embedding_flags(): tf.flags.DEFINE_integer('dim', 256, 'Dimension of embedding.') tf.flags.DEFINE_integer('num_negs', 5, 'Number of negative samplings.') tf.flags.DEFINE_integer('order', 1, 'LINE order.') - tf.flags.DEFINE_integer('walk_len', 3, 'Length of random walk path.') + tf.flags.DEFINE_integer('walk_len', 5, 'Length of random walk path.') tf.flags.DEFINE_float('walk_p', 1., 'Node2Vec return parameter.') tf.flags.DEFINE_float('walk_q', 1., 'Node2Vec in-out parameter.') - tf.flags.DEFINE_integer('left_win_size', 1, 'Left window size.') - tf.flags.DEFINE_integer('right_win_size', 1, 'Right window size.') + tf.flags.DEFINE_integer('left_win_size', 5, 'Left window size.') + tf.flags.DEFINE_integer('right_win_size', 5, 'Right window size.') tf.flags.DEFINE_list('fanouts', [10, 10], 'GCN fanouts.') tf.flags.DEFINE_enum('aggregator', 'mean', ['gcn', 'mean', 'meanpool', 'maxpool', 'attention'], @@ -106,7 +106,7 @@ def run_train(model, flags_obj, master, is_chief): _, loss, metric_name, metric = model(source) optimizer_class = optimizers.get(flags_obj.optimizer) - optimizer = optimizer_class(learning_rate=flags_obj.learning_rate) + optimizer = optimizer_class(flags_obj.learning_rate) global_step = tf.train.get_or_create_global_step() train_op = optimizer.minimize(loss, global_step=global_step) @@ -117,7 +117,8 @@ def run_train(model, flags_obj, master, is_chief): tf.train.LoggingTensorHook( tensor_to_log, every_n_iter=flags_obj.log_steps)) - num_steps = int((flags_obj.max_id + 1) // batch_size * flags_obj.num_epochs) + num_steps = int((flags_obj.max_id + 1) // flags_obj.batch_size * + flags_obj.num_epochs) hooks.append(tf.train.StopAtStepHook(last_step=num_steps)) if len(flags_obj.worker_hosts) == 0 or flags_obj.task_index == 1: @@ -205,7 +206,7 @@ def run_network_embedding(flags_obj, master, is_chief): num_negs=flags_obj.num_negs, order=flags_obj.order) - elif flags_obj.model == 'randomwalk': + elif flags_obj.model in ['randomwalk', 'deepwalk', 'node2vec']: model = models.Node2Vec( node_type=flags_obj.all_node_type, edge_type=flags_obj.all_edge_type, @@ -219,8 +220,8 @@ def run_network_embedding(flags_obj, master, is_chief): left_win_size=flags_obj.left_win_size, right_win_size=flags_obj.right_win_size) - elif flags_obj.model == 'gcn': - model = models.GCN( + elif flags_obj.model in ['gcn', 'gcn_supervised']: + model = models.SupervisedGCN( label_idx=flags_obj.label_idx, label_dim=flags_obj.label_dim, num_classes=flags_obj.num_classes, @@ -304,7 +305,7 @@ def run_network_embedding(flags_obj, master, is_chief): nb_num=5) elif flags_obj.model == 'lshne': - model = models.LsHNE(-1,[[0,0,0],[0,0,0]],-1,128,[1,1],[1,1]) + model = models.LsHNE(-1,[[[0,0,0],[0,0,0]]],-1,128,[1,1],[1,1]) elif flags_obj.model == 'saved_embedding': embedding_val = np.load(os.path.join(flags_obj.model_dir, 'embedding.npy')) diff --git a/tf_euler/python/sparse_aggregators.py b/tf_euler/python/sparse_aggregators.py index 18ef370..88282d8 100644 --- a/tf_euler/python/sparse_aggregators.py +++ b/tf_euler/python/sparse_aggregators.py @@ -28,6 +28,12 @@ def _sparse_ones_like(sp_tensor): sp_tensor.dense_shape) +def _sparse_eye(num_rows, dtype=tf.float32): + return tf.SparseTensor( + tf.stack([tf.range(num_rows)] * 2, axis=1), + tf.ones(num_rows, dtype), tf.stack([num_rows] * 2)) + + class GCNAggregator(layers.Layer): def __init__(self, dim, activation=tf.nn.relu, renorm=False, **kwargs): @@ -39,8 +45,8 @@ def call(self, inputs): self_embedding, neigh_embedding, adj = inputs adj = _sparse_ones_like(adj) - degree = tf.reshape(tf.sparse.reduce_sum(adj, 1), [-1, 1]) - agg_embedding = tf.sparse.matmul(adj, neigh_embedding) + degree = tf.reshape(tf.sparse_reduce_sum(adj, 1), [-1, 1]) + agg_embedding = tf.sparse_tensor_dense_matmul(adj, neigh_embedding) if self.renorm: agg_embedding = (self_embedding + agg_embedding) / (1. + degree) else: @@ -62,8 +68,8 @@ def call(self, inputs): self_embedding, neigh_embedding, adj = inputs adj = _sparse_ones_like(adj) - degree = tf.reshape(tf.sparse.reduce_sum(adj, 1), [-1, 1]) - agg_embedding = tf.sparse.matmul(adj, neigh_embedding) / \ + degree = tf.reshape(tf.sparse_reduce_sum(adj, 1), [-1, 1]) + agg_embedding = tf.sparse_tensor_dense_matmul(adj, neigh_embedding) / \ tf.maximum(degree, 1e-7) from_self = self.self_layer(self_embedding) @@ -88,8 +94,8 @@ def call(self, inputs): self_embedding, neigh_embedding, adj = inputs adj = _sparse_ones_like(adj) if self.renorm: - eye = tf.sparse.eye(adj.dense_shape[0]) - adj = tf.sparse.concat(1, [eye, adj]) + eye = _sparse_eye(adj.dense_shape[0]) + adj = tf.sparse_concat(1, [eye, adj]) if not self.renorm: from_all = self.dense(neigh_embedding) @@ -101,14 +107,14 @@ def call(self, inputs): self_weight = self.self_layer(from_self) all_weight = self.neigh_layer(from_all) - coefficient = tf.sparse.add(adj * self_weight, + coefficient = tf.sparse_add(adj * self_weight, adj * tf.reshape(all_weight, [1, -1])) coefficient = tf.SparseTensor( coefficient.indices, tf.nn.leaky_relu(coefficient.values), coefficient.dense_shape) - coefficient = tf.sparse.softmax(coefficient) + coefficient = tf.sparse_softmax(coefficient) - output = tf.sparse.matmul(coefficient, from_all) + output = tf.sparse_tensor_dense_matmul(coefficient, from_all) if not self.renorm: output = from_self + output if self.activation: @@ -118,10 +124,11 @@ def call(self, inputs): class AttentionAggregator(layers.Layer): - def __init__(self, dim, num_heads=4, activation=tf.nn.relu, **kwargs): + def __init__(self, dim, num_heads=4, activation=tf.nn.relu, renorm=False, + **kwargs): super(AttentionAggregator, self).__init__(**kwargs) dim //= num_heads - self.attentions = [SingleAttentionAggregator(dim, activation) + self.attentions = [SingleAttentionAggregator(dim, activation, renorm) for _ in range(num_heads)] def call(self, inputs): diff --git a/tools/pip/setup.py b/tools/pip/setup.py index 4005282..ebe9e87 100644 --- a/tools/pip/setup.py +++ b/tools/pip/setup.py @@ -39,7 +39,7 @@ def finalize_options(self): setuptools.setup(name='euler-gl', - version='0.1.0', + version='0.1.2', description='A toolset for network representation learning.', url='https://github.com/alibaba/euler', author='Alibaba Group Holding Limited',