Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Documentation and Test-Case for DataFrame Upload using CSV.RowIterator #186

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
6 changes: 4 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ TimeZones = "f269a46b-ccf7-5d73-abea-4c690281aa53"
[compat]
BinaryProvider = "0.5"
CEnum = "0.2, 0.3, 0.4"
DataFrames = "0.20, 0.21"
CSV = "0.6.2, 0.7"
DataFrames = "0.21"
Decimals = "0.4.1"
DocStringExtensions = "0.8.0"
Infinity = "0.2"
Expand All @@ -39,8 +40,9 @@ TimeZones = "0.9.2, 0.10, 0.11, 1"
julia = "1"

[extras]
CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b"
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"

[targets]
test = ["Test", "DataFrames"]
test = ["Test", "DataFrames", "CSV"]
31 changes: 21 additions & 10 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,32 @@ An alternative to repeated `INSERT` queries is the PostgreSQL `COPY` query.
`LibPQ.CopyIn` makes it easier to stream data to the server using a `COPY FROM STDIN` query.

```julia
using LibPQ, DataFrames
using LibPQ, DataFrames, CSV

conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER")

row_strings = imap(eachrow(df)) do row
if ismissing(row[:yes_nulls])
"$(row[:no_nulls]),\n"
else
"$(row[:no_nulls]),$(row[:yes_nulls])\n"
end
end
result = execute(conn, """
CREATE TEMPORARY TABLE libpqjl_test (
no_nulls varchar(10) PRIMARY KEY,
yes_nulls varchar(10)
);
""")

copyin = LibPQ.CopyIn("COPY libpqjl_test FROM STDIN (FORMAT CSV);", row_strings)
no_nulls = map(string, 'a':'z')
yes_nulls = Union{String, Missing}[isodd(Int(c)) ? string(c) : missing for c in 'a':'z']
data = DataFrame(no_nulls=no_nulls, yes_nulls=yes_nulls)

"""
Function for upload of a Tables.jl compatible data structure (e.g. DataFrames.jl) into the db.
"""
function load_by_copy!(table, conn:: LibPQ.Connection, tablename:: AbstractString)
iter = CSV.RowWriter(table)
column_names = first(iter)
copyin = LibPQ.CopyIn("COPY $tablename ($column_names) FROM STDIN (FORMAT CSV, HEADER);", iter)
execute(conn, copyin)
end

execute(conn, copyin)
load_by_copy!(data, conn, "libpqjl_test")

close(conn)
```
48 changes: 48 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ using Memento.TestUtils
using OffsetArrays
using TimeZones
using Tables
using CSV

Memento.config!("critical")

Expand All @@ -31,6 +32,27 @@ macro test_nolog_on_windows(ex...)
end
end

"""
load_by_copy!(table, con:: LibPQ.Connection, tablename:: AbstractString)

Fast data upload using the PostgreSQL `COPY FROM STDIN` method, which is usually much faster,
especially for large data amounts, than SQL Inserts.

`table` must be a Tables.jl compatible data structure.

All columns given in `table` must have corresponding fields in the target DB table,
the order of the columns does not matter.

Columns in the target DB table, which are not provided by the input `table`, are filled
with `null` (provided they are nullable).
"""
function load_by_copy!(table, conn::LibPQ.Connection, table_name::AbstractString)
iter = CSV.RowWriter(table)
column_names = first(iter)
copyin = LibPQ.CopyIn("COPY $table_name ($column_names) FROM STDIN (FORMAT CSV, HEADER);", iter)
execute(conn, copyin)
end

@testset "LibPQ" begin

@testset "ConninfoDisplay" begin
Expand Down Expand Up @@ -316,6 +338,32 @@ end
table_data = DataFrame(result)
@test isequal(table_data, data)
close(result)
close(conn)

# testing loading to database using CSV.jl row iterator
conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER")
result = execute(conn, """
CREATE TEMPORARY TABLE libpqjl_test (
no_nulls varchar(10) PRIMARY KEY,
yes_nulls varchar(10)
);
""")

result = load_by_copy!(data, conn, "libpqjl_test")
@test isopen(result)
@test status(result) == LibPQ.libpq_c.PGRES_COMMAND_OK
@test isempty(LibPQ.error_message(result))
close(result)

result = execute(
conn,
"SELECT no_nulls, yes_nulls FROM libpqjl_test ORDER BY no_nulls ASC;";
throw_error=true
)
table_data = DataFrame(result)
@test isequal(table_data, data)
close(result)


close(conn)
end
Expand Down