Skip to content

Commit

Permalink
Merge pull request #279 from abstractqqq/str_lcs
Browse files Browse the repository at this point in the history
added lcs_seq
  • Loading branch information
abstractqqq authored Oct 28, 2024
2 parents 3f0fe7b + 02193bd commit e34c8fc
Show file tree
Hide file tree
Showing 8 changed files with 315 additions and 260 deletions.
40 changes: 39 additions & 1 deletion python/polars_ds/string.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"str_d_leven",
"str_leven",
"str_osa",
"str_lcs_seq",
"str_fuzz",
"similar_to_vocab",
"extract_numbers",
Expand Down Expand Up @@ -565,6 +566,42 @@ def str_leven(
)


def str_lcs_seq(
c: str | pl.Expr,
other: str | pl.Expr,
parallel: bool = False,
return_sim: bool = True,
) -> pl.Expr:
"""
Computes the Longest Common Subsequence distance/similarity between this and the other str.
The distance is calculated as max(len1, len2) - similarity, where the similarity is the
the length of the longest common subsequence. Subsequences may not occupy consecutive positions.
Parameters
----------
c
The string column
other
Either the name of the column or a Polars expression. If you want to compare a single
string with all of column c, use pl.lit(your_str)
parallel
Whether to run the comparisons in parallel. Note that this is only recommended when this query
is the only one in the context and we are not in any aggregation context.
return_sim
If true, return normalized similarity.
"""
if return_sim:
return pl_plugin(
symbol="pl_lcs_seq_sim",
args=[str_to_expr(c), str_to_expr(other), pl.lit(parallel, pl.Boolean)],
)
else:
return pl_plugin(
symbol="pl_lcs_seq",
args=[str_to_expr(c), str_to_expr(other), pl.lit(parallel, pl.Boolean)],
)


def str_osa(
c: str | pl.Expr,
other: str | pl.Expr,
Expand Down Expand Up @@ -601,7 +638,8 @@ def str_osa(

def str_fuzz(c: str | pl.Expr, other: str | pl.Expr, parallel: bool = False) -> pl.Expr:
"""
A string similarity based on Longest Common Subsequence.
Calculates the normalized Indel similarity. (See the package rapidfuzz, fuzz.ratio for more
information.)
Parameters
----------
Expand Down
152 changes: 152 additions & 0 deletions src/str_ext/generic_str_distancer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/// Polars Series-wise generic str distancers
use rapidfuzz::distance::{
lcs_seq,
osa,
levenshtein,
damerau_levenshtein,
jaro
};
use polars::{prelude::{arity::binary_elementwise_values, DataType, Float64Chunked, Series, StringChunked, UInt32Chunked}, series::IntoSeries};
use pyo3_polars::export::polars_core::{
utils::rayon::prelude::{IntoParallelIterator, ParallelIterator},
POOL,
};

use crate::utils::split_offsets;

// Str Distance Related Helper Functions
pub trait StdBatchedStrDistancer {
fn distance(&self, s: &str) -> u32;
fn normalized_similarity(&self, s:&str) -> f64;
}

macro_rules! StdBatchedStrDistanceImpl {
($batch_struct: ty) => {
impl StdBatchedStrDistancer for $batch_struct{
fn distance(&self, s:&str) -> u32 {
self.distance(s.chars()) as u32
}

fn normalized_similarity(&self, s:&str) -> f64 {
self.normalized_similarity(s.chars())
}
}
}
}

StdBatchedStrDistanceImpl!(lcs_seq::BatchComparator<char>);
StdBatchedStrDistanceImpl!(osa::BatchComparator<char>);
StdBatchedStrDistanceImpl!(levenshtein::BatchComparator<char>);
StdBatchedStrDistanceImpl!(damerau_levenshtein::BatchComparator<char>);
StdBatchedStrDistanceImpl!(jaro::BatchComparator<char>);

// -------------------------------------------------------------------------------------

pub fn generic_batched_distance<T>(
batched: T,
ca: &StringChunked,
parallel: bool,
) -> Series
where
T: StdBatchedStrDistancer + std::marker::Sync
{
let out: UInt32Chunked = if parallel {
let n_threads = POOL.current_num_threads();
let splits = split_offsets(ca.len(), n_threads);
let chunks_iter = splits
.into_par_iter()
.map(|(offset, len)| {
let s1 = ca.slice(offset as i64, len);
let out: UInt32Chunked = s1
.apply_nonnull_values_generic(DataType::UInt32, |s| {
batched.distance(s)
});
out.downcast_iter().cloned().collect::<Vec<_>>()
});
let chunks = POOL.install(|| chunks_iter.collect::<Vec<_>>());
UInt32Chunked::from_chunk_iter(ca.name(), chunks.into_iter().flatten())
} else {
ca.apply_nonnull_values_generic(DataType::UInt32, |s|
batched.distance(s)
)
};
out.into_series()
}

pub fn generic_batched_sim<T>(
batched: T,
ca: &StringChunked,
parallel: bool,
) -> Series
where
T: StdBatchedStrDistancer + std::marker::Sync
{
let out: Float64Chunked = if parallel {
let n_threads = POOL.current_num_threads();
let splits = split_offsets(ca.len(), n_threads);
let chunks_iter = splits
.into_par_iter()
.map(|(offset, len)| {
let s1 = ca.slice(offset as i64, len);
let out: Float64Chunked = s1
.apply_nonnull_values_generic(DataType::Float64, |s| batched.normalized_similarity(s));
out.downcast_iter().cloned().collect::<Vec<_>>()
});
let chunks = POOL.install(|| chunks_iter.collect::<Vec<_>>());
Float64Chunked::from_chunk_iter(ca.name(), chunks.into_iter().flatten())
} else {
ca.apply_nonnull_values_generic(DataType::Float64, |s| batched.normalized_similarity(s))
};
out.into_series()
}

pub fn generic_binary_distance(
func: fn(&str, &str) -> u32,
ca1: &StringChunked,
ca2: &StringChunked,
parallel: bool
) -> Series {
let out: UInt32Chunked = if parallel {
let n_threads = POOL.current_num_threads();
let splits = split_offsets(ca1.len(), n_threads);
let chunks_iter= splits
.into_par_iter()
.map(|(offset, len)| {
let s1 = ca1.slice(offset as i64, len);
let s2 = ca2.slice(offset as i64, len);
let out: UInt32Chunked = binary_elementwise_values(&s1, &s2, func);
out.downcast_iter().cloned().collect::<Vec<_>>()
});
let chunks = POOL.install(|| chunks_iter.collect::<Vec<_>>());
UInt32Chunked::from_chunk_iter(ca1.name(), chunks.into_iter().flatten())
} else {
binary_elementwise_values(ca1, ca2, func)
};
out.into_series()
}

pub fn generic_binary_sim(
func: fn(&str, &str) -> f64,
ca1: &StringChunked,
ca2: &StringChunked,
parallel: bool
) -> Series {
let out: Float64Chunked = if parallel {
let n_threads = POOL.current_num_threads();
let splits = split_offsets(ca1.len(), n_threads);
let chunks_iter= splits
.into_par_iter()
.map(|(offset, len)| {
let s1 = ca1.slice(offset as i64, len);
let s2 = ca2.slice(offset as i64, len);
let out: Float64Chunked = binary_elementwise_values(&s1, &s2, func);
out.downcast_iter().cloned().collect::<Vec<_>>()
});
let chunks = POOL.install(|| chunks_iter.collect::<Vec<_>>());
Float64Chunked::from_chunk_iter(ca1.name(), chunks.into_iter().flatten())
} else {
binary_elementwise_values(ca1, ca2, func)
};
out.into_series()
}

53 changes: 13 additions & 40 deletions src/str_ext/jaro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use pyo3_polars::{
POOL,
},
};
use super::generic_str_distancer::{generic_batched_sim, generic_binary_sim};
use rapidfuzz::distance::{jaro, jaro_winkler};

#[inline]
Expand All @@ -34,38 +35,9 @@ fn pl_jaro(inputs: &[Series], context: CallerContext) -> PolarsResult<Series> {
if ca2.len() == 1 {
let r = ca2.get(0).unwrap();
let batched = jaro::BatchComparator::new(r.chars());
let out: Float64Chunked = if can_parallel {
let n_threads = POOL.current_num_threads();
let splits = split_offsets(ca1.len(), n_threads);
let chunks_iter = splits.into_par_iter().map(|(offset, len)| {
let s1 = ca1.slice(offset as i64, len);
let out: Float64Chunked = s1.apply_nonnull_values_generic(DataType::Float64, |s| {
batched.similarity(s.chars())
});
out.downcast_iter().cloned().collect::<Vec<_>>()
});
let chunks = POOL.install(|| chunks_iter.collect::<Vec<_>>());
Float64Chunked::from_chunk_iter(ca1.name(), chunks.into_iter().flatten())
} else {
ca1.apply_nonnull_values_generic(DataType::Float64, |s| batched.similarity(s.chars()))
};
Ok(out.into_series())
Ok(generic_batched_sim(batched, ca1, can_parallel))
} else if ca1.len() == ca2.len() {
let out: Float64Chunked = if can_parallel {
let n_threads = POOL.current_num_threads();
let splits = split_offsets(ca1.len(), n_threads);
let chunks_iter = splits.into_par_iter().map(|(offset, len)| {
let s1 = ca1.slice(offset as i64, len);
let s2 = ca2.slice(offset as i64, len);
let out: Float64Chunked = binary_elementwise_values(&s1, &s2, jaro_sim);
out.downcast_iter().cloned().collect::<Vec<_>>()
});
let chunks = POOL.install(|| chunks_iter.collect::<Vec<_>>());
Float64Chunked::from_chunk_iter(ca1.name(), chunks.into_iter().flatten())
} else {
binary_elementwise_values(ca1, ca2, jaro_sim)
};
Ok(out.into_series())
Ok(generic_binary_sim(jaro_sim, ca1, ca2, can_parallel))
} else {
Err(PolarsError::ShapeMismatch(
"Inputs must have the same length or one of them must be a scalar.".into(),
Expand Down Expand Up @@ -110,20 +82,21 @@ fn pl_jw(inputs: &[Series], context: CallerContext) -> PolarsResult<Series> {
};
Ok(out.into_series())
} else if ca1.len() == ca2.len() {
let out: Float64Chunked = if can_parallel {
let out: Float64Chunked = if parallel {
let n_threads = POOL.current_num_threads();
let splits = split_offsets(ca1.len(), n_threads);
let chunks_iter = splits.into_par_iter().map(|(offset, len)| {
let s1 = ca1.slice(offset as i64, len);
let s2 = ca2.slice(offset as i64, len);
let out: Float64Chunked =
binary_elementwise_values(&s1, &s2, |x, y| jw_sim(x, y, weight));
out.downcast_iter().cloned().collect::<Vec<_>>()
});
let chunks_iter= splits
.into_par_iter()
.map(|(offset, len)| {
let s1 = ca1.slice(offset as i64, len);
let s2 = ca2.slice(offset as i64, len);
let out: Float64Chunked = binary_elementwise_values(&s1, &s2, |s1, s2| jw_sim(s1, s2, weight));
out.downcast_iter().cloned().collect::<Vec<_>>()
});
let chunks = POOL.install(|| chunks_iter.collect::<Vec<_>>());
Float64Chunked::from_chunk_iter(ca1.name(), chunks.into_iter().flatten())
} else {
binary_elementwise_values(ca1, ca2, |x, y| jw_sim(x, y, weight))
binary_elementwise_values(ca1, ca2, |s1, s2| jw_sim(s1, s2, weight))
};
Ok(out.into_series())
} else {
Expand Down
59 changes: 59 additions & 0 deletions src/str_ext/lcs_seq.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use polars::prelude::*;
use pyo3_polars::derive::{polars_expr, CallerContext};
use super::generic_str_distancer::{
generic_batched_distance,
generic_batched_sim,
generic_binary_distance,
generic_binary_sim
};
use rapidfuzz::distance::lcs_seq;

#[inline(always)]
fn lcs_seq(s1: &str, s2: &str) -> u32 {
lcs_seq::distance(s1.chars(), s2.chars()) as u32
}

#[inline(always)]
fn lcs_seq_sim(s1: &str, s2: &str) -> f64 {
lcs_seq::normalized_similarity(s1.chars(), s2.chars())
}

#[polars_expr(output_type=UInt32)]
fn pl_lcs_seq(inputs: &[Series], context: CallerContext) -> PolarsResult<Series> {
let ca1 = inputs[0].str()?;
let ca2 = inputs[1].str()?;
let parallel = inputs[2].bool()?;
let parallel = parallel.get(0).unwrap();
let can_parallel = parallel && !context.parallel();
if ca2.len() == 1 {
let r = ca2.get(0).unwrap();
let batched = lcs_seq::BatchComparator::new(r.chars());
Ok(generic_batched_distance(batched, ca1, can_parallel))
} else if ca1.len() == ca2.len() {
Ok(generic_binary_distance(lcs_seq, ca1, ca2, can_parallel))
} else {
Err(PolarsError::ShapeMismatch(
"Inputs must have the same length or one of them must be a scalar.".into(),
))
}
}

#[polars_expr(output_type=Float64)]
fn pl_lcs_sim(inputs: &[Series], context: CallerContext) -> PolarsResult<Series> {
let ca1 = inputs[0].str()?;
let ca2 = inputs[1].str()?;
let parallel = inputs[2].bool()?;
let parallel = parallel.get(0).unwrap();
let can_parallel = parallel && !context.parallel();
if ca2.len() == 1 {
let r = ca2.get(0).unwrap();
let batched = lcs_seq::BatchComparator::new(r.chars());
Ok(generic_batched_sim(batched, ca1, can_parallel))
} else if ca1.len() == ca2.len() {
Ok(generic_binary_sim(lcs_seq_sim, ca1, ca2, can_parallel))
} else {
Err(PolarsError::ShapeMismatch(
"Inputs must have the same length or one of them must be a scalar.".into(),
))
}
}
Loading

0 comments on commit e34c8fc

Please sign in to comment.