Skip to content

Commit

Permalink
written cf-examples
Browse files Browse the repository at this point in the history
  • Loading branch information
vishesh9131 committed Oct 31, 2024
1 parent 94b865e commit 5d83179
Show file tree
Hide file tree
Showing 48 changed files with 7,928 additions and 44 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# item_profiling implementation
from typing import List, Dict, Any
import logging

logger = logging.getLogger(__name__)

class ItemProfilingRecommender:
def __init__(self):
Expand All @@ -24,17 +27,21 @@ def fit(self, data: Dict[int, List[int]], item_features: Dict[int, Dict[str, Any
for feature, value in item_features.get(item_id, {}).items():
self.item_profiles[item_id][feature] = self.item_profiles[item_id].get(feature, 0) + 1

def recommend(self, item_indices: List[int], top_n: int = 10) -> List[int]:
def recommend(self, query: str, top_n: int = 10) -> List[int]:
"""
Generate top-N item recommendations based on item profiles.
Recommend items based on the similarity of the query to the documents.
Parameters:
- item_indices (List[int]): List of item indices to base recommendations on.
- top_n (int): The number of recommendations to generate.
- query (str): The query text for which to generate recommendations.
- top_n (int): Number of top recommendations to return.
Returns:
- List[int]: List of recommended item indices.
"""
# Placeholder implementation
# Implement similarity-based recommendations or other logic as needed
return []
logger.info("Generating recommendations using LSA.")
query_vec = self.transform([query])
doc_vecs = self.lsa_model.transform(self.vectorizer.transform(self.vectorizer.get_feature_names_out()))
similarity_scores = (doc_vecs @ query_vec.T).flatten()
top_indices = similarity_scores.argsort()[::-1][:top_n]
logger.info(f"Top {top_n} recommendations generated using LSA.")
return top_indices.tolist()
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .rule_based import RULE_BASED as OTH_RULE_BASED
from .ontology_based import ONTOLOGY_BASED as OTH_ONTOLOGY_BASED
from .sentiment_analysis import SENTIMENT_ANALYSIS as OTH_SENTIMENT_ANALYSIS
from .rule_based import RuleBasedFilter as OTH_RULE_BASED
from .ontology_based import OntologyBasedFilter as OTH_ONTOLOGY_BASED
from .sentiment_analysis import SentimentAnalysisFilter as OTH_SENTIMENT_ANALYSIS
Original file line number Diff line number Diff line change
@@ -1,2 +1,70 @@
# ontology_based implementation
pass
from owlready2 import get_ontology

class OntologyBasedFilter:
def __init__(self, ontology_path):
"""
Initializes the OntologyBasedFilter with a specific ontology.
Parameters:
- ontology_path (str): The file path to the ontology (.owl) file.
"""
try:
self.ontology = get_ontology(ontology_path).load()
except Exception as e:
raise ValueError(f"Failed to load ontology from {ontology_path}: {e}")

def get_concepts(self, content):
"""
Extracts concepts from the content based on the ontology.
Parameters:
- content (str): The content to extract concepts from.
Returns:
- set: A set of concepts identified in the content.
"""
concepts_found = set()
content_lower = content.lower()

for cls in self.ontology.classes():
if cls.name.lower() in content_lower:
concepts_found.add(cls.name)

return concepts_found

def filter_content(self, content):
"""
Filters the content based on ontology-defined relationships.
Parameters:
- content (str): The content to be filtered.
Returns:
- dict: A dictionary with 'status' and 'related_concepts'.
"""
concepts = self.get_concepts(content)
related_concepts = self.find_related_concepts(concepts)

if related_concepts:
return {'status': 'filtered', 'related_concepts': related_concepts}
else:
return {'status': 'allowed', 'related_concepts': related_concepts}

def find_related_concepts(self, concepts):
"""
Finds related concepts within the ontology.
Parameters:
- concepts (set): A set of concepts to find relationships for.
Returns:
- dict: A dictionary mapping each concept to its related concepts.
"""
related = {}
for concept in concepts:
try:
cls = self.ontology[concept]
related[concept] = [str(rel) for rel in cls.is_a]
except KeyError:
related[concept] = []
return related
Original file line number Diff line number Diff line change
@@ -1,2 +1,49 @@
# rule_based implementation
pass
class RuleBasedFilter:
def __init__(self, rules=None):
"""
Initializes the RuleBasedFilter with a set of rules.
Parameters:
- rules (list of dict): A list where each rule is a dictionary containing
'keyword' and 'action' keys.
"""
if rules is None:
self.rules = []
else:
self.rules = rules

def add_rule(self, keyword, action):
"""
Adds a new rule to the filter.
Parameters:
- keyword (str): The keyword to look for in the content.
- action (str): The action to take ('block', 'flag', etc.).
"""
rule = {'keyword': keyword.lower(), 'action': action.lower()}
self.rules.append(rule)

def filter_content(self, content):
"""
Filters the content based on the predefined rules.
Parameters:
- content (str): The content to be filtered.
Returns:
- dict: A dictionary with 'status' and 'actions' applied.
"""
actions_applied = []
content_lower = content.lower()

for rule in self.rules:
if rule['keyword'] in content_lower:
actions_applied.append(rule['action'])

if 'block' in actions_applied:
return {'status': 'blocked', 'actions': actions_applied}
elif 'flag' in actions_applied:
return {'status': 'flagged', 'actions': actions_applied}
else:
return {'status': 'allowed', 'actions': actions_applied}
Original file line number Diff line number Diff line change
@@ -1,2 +1,45 @@
# sentiment_analysis implementation
pass
from textblob import TextBlob

class SentimentAnalysisFilter:
def __init__(self, threshold=0.1):
"""
Initializes the SentimentAnalysisFilter.
Parameters:
- threshold (float): The sentiment polarity threshold to trigger actions.
Positive values can indicate positive sentiment,
negative values indicate negative sentiment.
"""
self.threshold = threshold

def analyze_sentiment(self, content):
"""
Analyzes the sentiment of the given content.
Parameters:
- content (str): The content to analyze.
Returns:
- float: The sentiment polarity score ranging from -1.0 to 1.0.
"""
blob = TextBlob(content)
return blob.sentiment.polarity

def filter_content(self, content):
"""
Filters the content based on its sentiment.
Parameters:
- content (str): The content to be filtered.
Returns:
- dict: A dictionary with 'status' and 'sentiment_score'.
"""
sentiment_score = self.analyze_sentiment(content)

if sentiment_score < -self.threshold:
return {'status': 'negative', 'sentiment_score': sentiment_score}
elif sentiment_score > self.threshold:
return {'status': 'positive', 'sentiment_score': sentiment_score}
else:
return {'status': 'neutral', 'sentiment_score': sentiment_score}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .scalable_algorithms import SCALABLE_ALGORITHMS as PER_SCALABLE_ALGORITHMS
from .feature_extraction import FEATURE_EXTRACTION as PER_FEATURE_EXTRACTION
from .load_balancing import LOAD_BALANCING as PER_LOAD_BALANCING
from .scalable_algorithms import ScalableAlgorithms as PER_SCALABLE_ALGORITHMS
from .feature_extraction import FeatureExtraction as PER_FEATURE_EXTRACTION
from .load_balancing import LoadBalancing as PER_LOAD_BALANCING
Original file line number Diff line number Diff line change
@@ -1,2 +1,86 @@
# feature_extraction implementation
pass
import logging
from sklearn.feature_extraction.text import TfidfVectorizer
from nltk.stem import WordNetLemmatizer
import nltk
from typing import List, Any

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Download NLTK resources if not already present
nltk.download('punkt', quiet=True)
nltk.download('wordnet', quiet=True)

class FeatureExtraction:
def __init__(self, max_features=5000):
"""
Initializes the FeatureExtraction with a TF-IDF vectorizer.
Parameters:
- max_features (int): The maximum number of features (vocabulary size).
"""
self.max_features = max_features
self.lemmatizer = WordNetLemmatizer()
self.vectorizer = TfidfVectorizer(
max_features=self.max_features,
stop_words='english', # Use built-in stop words
tokenizer=self.tokenize
)
logger.info(f"FeatureExtraction initialized with max_features={self.max_features}.")

def tokenize(self, text: str) -> List[str]:
"""
Tokenizes and lemmatizes the input text.
Parameters:
- text (str): The text to tokenize.
Returns:
- list: A list of processed tokens.
"""
tokens = nltk.word_tokenize(text.lower())
lemmatized = [
self.lemmatizer.lemmatize(token)
for token in tokens
if token.isalpha()
]
logger.debug(f"Tokenized text: {lemmatized}")
return lemmatized

def fit_transform(self, documents: List[str]):
"""
Fits the TF-IDF vectorizer on the documents and transforms them into feature vectors.
Parameters:
- documents (list of str): The list of documents to process.
Returns:
- sparse matrix: The TF-IDF feature matrix.
"""
logger.info("Fitting and transforming documents into TF-IDF features.")
return self.vectorizer.fit_transform(documents)

def transform(self, documents: List[str]) -> Any:
"""
Transforms the documents into TF-IDF feature vectors using the already fitted vectorizer.
Parameters:
- documents (list of str): The list of documents to transform.
Returns:
- sparse matrix: The TF-IDF feature matrix.
"""
logger.info("Transforming documents into LSA latent space.")
tfidf_matrix = self.vectorizer.transform(documents) # Use transform, not fit
return self.lsa_model.transform(tfidf_matrix)

def get_feature_names(self) -> List[str]:
"""
Retrieves the feature names (vocabulary) from the vectorizer.
Returns:
- list: A list of feature names.
"""
return self.vectorizer.get_feature_names_out()
Original file line number Diff line number Diff line change
@@ -1,2 +1,86 @@
# load_balancing implementation
pass
import logging
from queue import Queue
from threading import Thread
import time
import threading
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class LoadBalancing:
def __init__(self, num_workers=4):
"""
Initializes the LoadBalancing with a specified number of worker threads.
Parameters:
- num_workers (int): The number of worker threads to spawn.
"""
self.num_workers = num_workers
self.task_queue = Queue()
self.results = []
self.threads = []
self._init_workers()
logger.info(f"LoadBalancing initialized with {self.num_workers} workers.")

def _init_workers(self):
"""
Initializes worker threads that continuously process tasks from the queue.
"""
for i in range(self.num_workers):
thread = Thread(target=self._worker, name=f"Worker-{i+1}", daemon=True)
thread.start()
self.threads.append(thread)
logger.debug(f"Started {thread.name}.")

def _worker(self):
"""
Worker thread that processes tasks from the queue.
"""
while True:
func, args, kwargs = self.task_queue.get()
if func is None:
# Sentinel found, terminate the thread
logger.debug(f"{threading.current_thread().name} received sentinel. Exiting.")
break
try:
result = func(*args, **kwargs)
self.results.append(result)
logger.debug(f"{threading.current_thread().name} processed a task with result: {result}")
except Exception as e:
logger.error(f"Error processing task: {e}")
finally:
self.task_queue.task_done()

def add_task(self, func, *args, **kwargs):
"""
Adds a new task to the queue.
Parameters:
- func (callable): The function to execute.
- *args: Positional arguments for the function.
- **kwargs: Keyword arguments for the function.
"""
self.task_queue.put((func, args, kwargs))
logger.debug(f"Added task {func.__name__} to the queue.")

def get_results(self):
"""
Waits for all tasks to be processed and returns the results.
Returns:
- list: A list of results from all tasks.
"""
self.task_queue.join()
return self.results

def shutdown(self):
"""
Shuts down all worker threads gracefully by sending sentinel tasks.
"""
for _ in self.threads:
self.task_queue.put((None, (), {})) # Sentinel
for thread in self.threads:
thread.join()
logger.debug(f"{thread.name} has terminated.")
logger.info("LoadBalancing has been shutdown.")
Loading

0 comments on commit 5d83179

Please sign in to comment.