diff --git a/datahost-ld-openapi/deps.edn b/datahost-ld-openapi/deps.edn index b7e3cdaa..e00b8d96 100644 --- a/datahost-ld-openapi/deps.edn +++ b/datahost-ld-openapi/deps.edn @@ -1,29 +1,36 @@ {:paths ["resources" "src"] :deps {datahost/shared {:local/root "../shared-deps"} - clj-http/clj-http {:mvn/version "3.12.3"} - io.github.swirrl/grafter.repository {:mvn/version "3.0.0"} - grafter/matcha.alpha {:mvn/version "0.4.0"} - dev.weavejester/medley {:mvn/version "1.7.0"} + clj-http/clj-http {:mvn/version "3.12.3"} + io.github.swirrl/grafter.repository {:mvn/version "3.0.0"} + grafter/matcha.alpha {:mvn/version "0.4.0"} + dev.weavejester/medley {:mvn/version "1.7.0"} - com.yetanalytics/flint {:mvn/version "0.2.1" - :exclusions [org.clojure/clojure - org.clojure/clojurescript]} + com.yetanalytics/flint {:mvn/version "0.2.1" + :exclusions [org.clojure/clojure + org.clojure/clojurescript]} - org.clojure/data.json {:mvn/version "2.4.0"} - metosin/malli {:mvn/version "0.11.0"} - metosin/reitit {:mvn/version "0.7.0-alpha4"} - metosin/ring-http-response {:mvn/version "0.9.3"} - ring-cors/ring-cors {:mvn/version "0.1.13"} - duratom/duratom {:mvn/version "0.5.8"} - camel-snake-kebab/camel-snake-kebab {:mvn/version "0.4.3"} - metosin/jsonista {:mvn/version "0.3.7"} - org.glassfish/jakarta.json {:mvn/version "2.0.1"} - com.apicatalog/titanium-json-ld {:mvn/version "1.3.2"} - scicloj/tablecloth {:mvn/version "7.000-beta-50"} - net.openhft/zero-allocation-hashing {:mvn/version "0.16"} - metrics-clojure/metrics-clojure {:mvn/version "2.10.0"} - com.google.cloud/google-cloud-secretmanager {:mvn/version "2.23.0"}} + org.clojure/data.json {:mvn/version "2.4.0"} + metosin/malli {:mvn/version "0.11.0"} + metosin/reitit {:mvn/version "0.7.0-alpha4"} + metosin/ring-http-response {:mvn/version "0.9.3"} + ring-cors/ring-cors {:mvn/version "0.1.13"} + duratom/duratom {:mvn/version "0.5.8"} + camel-snake-kebab/camel-snake-kebab {:mvn/version "0.4.3"} + metosin/jsonista {:mvn/version "0.3.7"} + org.glassfish/jakarta.json {:mvn/version "2.0.1"} + com.apicatalog/titanium-json-ld {:mvn/version "1.3.2"} + scicloj/tablecloth {:mvn/version "7.000-beta-50"} + net.openhft/zero-allocation-hashing {:mvn/version "0.16"} + metrics-clojure/metrics-clojure {:mvn/version "2.10.0"} + com.google.cloud/google-cloud-secretmanager {:mvn/version "2.23.0"} + + com.github.seancorfield/next.jdbc {:mvn/version "1.3.894"} + com.layerware/hugsql-core {:mvn/version "0.5.3"} + com.layerware/hugsql-adapter-next-jdbc {:mvn/version "0.5.3"} + + org.xerial/sqlite-jdbc {:mvn/version "3.44.1.0"} + com.h2database/h2 {:mvn/version "2.2.224"}} @@ -31,26 +38,26 @@ :aliases {:run {:main-opts ["-m" "tpximpact.datahost.ldapi"]} - :auth/basic {:extra-paths ["env/auth/resources"]} + :auth/basic {:extra-paths ["env/auth/resources"]} - :ldapi/docker {:extra-paths ["env/docker/resources" "env/auth/resources"] - :jvm-opts ["-Xmx4g" - "-Dcom.sun.management.jmxremote.ssl=false" - "-Dcom.sun.management.jmxremote.authenticate=false" - "-Dcom.sun.management.jmxremote.port=3007" - ;;"-Dlog4j.configuration=log4j2-docker.xml" - ;;"-Dlog4j2.debug=true" - ] + :ldapi/docker {:extra-paths ["env/docker/resources" "env/auth/resources"] + :jvm-opts ["-Xmx4g" + "-Dcom.sun.management.jmxremote.ssl=false" + "-Dcom.sun.management.jmxremote.authenticate=false" + "-Dcom.sun.management.jmxremote.port=3007" + ;;"-Dlog4j.configuration=log4j2-docker.xml" + ;;"-Dlog4j2.debug=true" + ] - :main-opts ["-m" "tpximpact.datahost.ldapi"]} + :main-opts ["-m" "tpximpact.datahost.ldapi"]} - :build {:deps {io.github.clojure/tools.build {:git/tag "v0.9.4" :git/sha "76b78fe"} - io.github.seancorfield/build-clj {:git/tag "v0.9.2" :git/sha "9c9f078"} + :build {:deps {io.github.clojure/tools.build {:git/tag "v0.9.4" :git/sha "76b78fe"} + io.github.seancorfield/build-clj {:git/tag "v0.9.2" :git/sha "9c9f078"} - io.github.juxt/pack.alpha {:git/sha "802b3d6347376db51093d122eb4b8cf8a7bbd7cf"} - com.google.cloud.tools/jib-core {:mvn/version "0.23.0"} - } - :ns-default build} + io.github.juxt/pack.alpha {:git/sha "802b3d6347376db51093d122eb4b8cf8a7bbd7cf"} + com.google.cloud.tools/jib-core {:mvn/version "0.23.0"} + } + :ns-default build} :test {:extra-paths ["test" "env/test/resources"] :extra-deps {lambdaisland/kaocha {:mvn/version "1.85.1342"} @@ -67,19 +74,23 @@ :test-watch {:extra-deps {lambdaisland/kaocha {:mvn/version "1.85.1342"}} :exec-fn kaocha.runner/exec-fn :exec-args {:watch? true - :skip-meta [:pending] - :fail-fast? true}} + :skip-meta [:pending] + :fail-fast? true}} :test/unit {:exec-args {:tests [{:id :unit - :skip-meta [:hurl]}]}} + :skip-meta [:hurl]}]}} :test/integration {:exec-args {:tests [{:id :integration - :focus-meta [:hurl]}] - :plugins [] - :reporter kaocha.report/documentation}} + :focus-meta [:hurl]}] + :plugins [] + :reporter kaocha.report/documentation}} + :test/sql {:exec-args {:tests [{:id :sql + :focus-meta [:sql]}] + :plugins [] + :reporter kaocha.report/documentation}} - :dev {:extra-paths ["env/dev/src" "env/test/resources" "test"] - :extra-deps {integrant/repl {:mvn/version "0.3.2"} - org.clojure/test.check {:mvn/version "1.1.1"} - grafter/vocabularies {:mvn/version "0.3.9"} - vvvvalvalval/scope-capture {:mvn/version "0.3.3"} + :dev {:extra-paths ["env/dev/src" "env/test/resources" "test"] + :extra-deps {integrant/repl {:mvn/version "0.3.2"} + org.clojure/test.check {:mvn/version "1.1.1"} + grafter/vocabularies {:mvn/version "0.3.9"} + vvvvalvalval/scope-capture {:mvn/version "0.3.3"} org.clojure/data.csv {:mvn/version "1.0.1"}} - :jvm-opts ["-Dclojure.tools.logging.factory=clojure.tools.logging.impl/log4j2-factory"]}}} + :jvm-opts ["-Dclojure.tools.logging.factory=clojure.tools.logging.impl/log4j2-factory"]}}} diff --git a/datahost-ld-openapi/doc/decision-log.md b/datahost-ld-openapi/doc/decision-log.md index fa635b02..3e6dcb86 100644 --- a/datahost-ld-openapi/doc/decision-log.md +++ b/datahost-ld-openapi/doc/decision-log.md @@ -29,3 +29,11 @@ The latest agreements about the entity/update model are these: **Re: Fuseki** - We'll stick with the RDF4J native store mounted on its own volume as the triplestore for now - We realized fuseki/JENA likely won't be the correct solution for the full production system anyway, and the RDF4J native store can meet our needs for this phase, so there is no point in adding more complexity to our deployment and development by adding a separate db right now + +## December 14, 2023 + +**Re: Using SQL for dataset storage** +- Tablecloth provides an in-memory data structures are convenient but are obviously limit the size of the ingested data. +- We used generated data (see [issue #314](https://github.com/Swirrl/datahost-prototypes/issues/314)) to import up to 10mil rows + using H2 and SQLite databases in a spike (TC couldn't go beyond 3mil on M2 MB Pro with 16GB RAM). + \ No newline at end of file diff --git a/datahost-ld-openapi/env/test/resources/test-system.edn b/datahost-ld-openapi/env/test/resources/test-system.edn index b0b416de..e36e61a1 100644 --- a/datahost-ld-openapi/env/test/resources/test-system.edn +++ b/datahost-ld-openapi/env/test/resources/test-system.edn @@ -1,5 +1,8 @@ {:tpximpact.datahost.ldapi.jetty/http-port #int #or [#env "LD_API_HTTP_PORT" "3400"] :tpximpact.datahost.ldapi.native-datastore.repo/data-directory #or [#env "LD_API_TEST_DATA_DIR" "./tmp/ld-test-db"] + :tpximpact.datahost.ldapi.store.sql/db-config {:spec "jdbc:sqlite::memory:" + :user "SA" + :password ""} ;; test rdf-base-uri intentionally uses a weird value that doesn't include `data` ;; in order to prove that RDF URIs are not necessarily tied to routing. @@ -14,4 +17,30 @@ :tpximpact.datahost.ldapi.store.temp-file-store/store {} :tpximpact.datahost.ldapi.router/handler - {:change-store #ig/ref :tpximpact.datahost.ldapi.store.temp-file-store/store}} + {:change-store #ig/ref :tpximpact.datahost.ldapi.store.temp-file-store/store} + + :tpximpact.datahost.ldapi.test/sqlite-connection + {:db-config #ig/ref :tpximpact.datahost.ldapi.store.sql/db-config} + + :tpximpact.datahost.ldapi.store.sql/store-factory + {:db-config #ig/ref :tpximpact.datahost.ldapi.store.sql/db-config + :connection #ig/ref :tpximpact.datahost.ldapi.test/sqlite-connection + :data-source #ig/ref :tpximpact.datahost.ldapi.test/data-source + :db-executor #ig/ref :tpximpact.datahost.ldapi.store.sql/executor} + + :tpximpact.datahost.ldapi.test/data-source + {:connection #ig/ref :tpximpact.datahost.ldapi.test/sqlite-connection} + + :tpximpact.datahost.ldapi.test/sql-db + {:db-config #ig/ref :tpximpact.datahost.ldapi.store.sql/db-config + :store-factory #ig/ref :tpximpact.datahost.ldapi.store.sql/store-factory + :connection #ig/ref :tpximpact.datahost.ldapi.test/sqlite-connection + :db-executor #ig/ref :tpximpact.datahost.ldapi.store.sql/executor + :data-source #ig/ref :tpximpact.datahost.ldapi.test/data-source} + + ;; method for this key will create the necessary tables + ;; and make them available for test + :tpximpact.datahost.ldapi.models.release/common-tables + {:db {:db-config #ig/ref :tpximpact.datahost.ldapi.store.sql/db-config + :db-executor #ig/ref :tpximpact.datahost.ldapi.store.sql/executor + :data-source #ig/ref :tpximpact.datahost.ldapi.test/data-source}}} diff --git a/datahost-ld-openapi/hurl-scripts/common/schema-name-age-2.json b/datahost-ld-openapi/hurl-scripts/common/schema-name-age-2.json new file mode 100644 index 00000000..f33e81e4 --- /dev/null +++ b/datahost-ld-openapi/hurl-scripts/common/schema-name-age-2.json @@ -0,0 +1,20 @@ +{ + "@type": "dh:TableSchema", + "appropriate-csvw:modeling-of-dialect": "UTF-8,RFC4180", + "dh:appliesToRelease": "http://localhost:3000/data/test-032/releases/release-1", + "dcterms:title": "Fun schema, different title", + "dh:columns": [ + { + "@type": "dh:DimensionColumn", + "csvw:datatype": "string", + "csvw:name": "name", + "csvw:titles": "name" + }, + { + "@type": "dh:MeasureColumn", + "csvw:datatype": "int", + "csvw:name": "age", + "csvw:titles": "age" + } + ] +} \ No newline at end of file diff --git a/datahost-ld-openapi/hurl-scripts/issue-282.hurl b/datahost-ld-openapi/hurl-scripts/common/setup-area+age-range+estimate.hurl similarity index 76% rename from datahost-ld-openapi/hurl-scripts/issue-282.hurl rename to datahost-ld-openapi/hurl-scripts/common/setup-area+age-range+estimate.hurl index b17a0fcd..707ef2aa 100644 --- a/datahost-ld-openapi/hurl-scripts/issue-282.hurl +++ b/datahost-ld-openapi/hurl-scripts/common/setup-area+age-range+estimate.hurl @@ -1,6 +1,3 @@ -# Description: We try to append a data with incorrectly formatted 'double' value, -# and that should be rejected by the API. - PUT {{scheme}}://{{host_name}}/data/{{series}} Content-Type: application/json {"dcterms:description": "Crimes Data - Float coercion test", @@ -63,23 +60,4 @@ HTTP 201 revision_url: header "Location" GET {{scheme}}://{{host_name}}{{revision_url}} -Accept: application/json - -HTTP 200 - -## One append - -POST {{scheme}}://{{host_name}}{{revision_url}}/appends -Accept: application/json -Content-Type: text/csv -[QueryStringParams] -description: Add data for liverpool -format: text/csv -``` -area,age_range,estimate -liverpool,16-20,103.5 -manchester,16-20,invalid_double -leeds,16-20,99.2 -``` - -HTTP 400 +Accept: application/json \ No newline at end of file diff --git a/datahost-ld-openapi/hurl-scripts/int-003.hurl b/datahost-ld-openapi/hurl-scripts/int-003.hurl new file mode 100644 index 00000000..9715241e --- /dev/null +++ b/datahost-ld-openapi/hurl-scripts/int-003.hurl @@ -0,0 +1,31 @@ +# Description: Basic smoke tests for release data. + + +GET {{scheme}}://{{host_name}}/data/{{series}}/release/release-1-medatata.json + +HTTP 404 +[Asserts] +body matches /Not found/ + +PUT {{scheme}}://{{host_name}}/data/{{series}} +Accept: application/json +Content-Type: application/json +{ + "dcterms:title": "Test Series", + "dcterms:description": "A very simple series" +} + +HTTP 201 +[Captures] +dataset: jsonpath "$['dh:baseEntity']" + +PUT {{scheme}}://{{host_name}}/data/{{series}}/release/release-1 +Accept: application/json +Content-Type: application/json +{ + "dcterms:title": "Test Release", + "dcterms:description": "A very simple Release" +} + +HTTP 201 + diff --git a/datahost-ld-openapi/hurl-scripts/int-005/int-005.hurl b/datahost-ld-openapi/hurl-scripts/int-005/int-005.hurl new file mode 100644 index 00000000..dc975165 --- /dev/null +++ b/datahost-ld-openapi/hurl-scripts/int-005/int-005.hurl @@ -0,0 +1,26 @@ +# Description: Ensure we accept 'double' values formatted without the decimal point. + + +POST {{scheme}}://{{host_name}}/data/{{series}}/release/{{release}}/revision/1/appends +Accept: application/json +Content-Type: text/csv +[QueryStringParams] +description: Add data for liverpool +format: text/csv +``` +area,age_range,estimate +liverpool,16-20,103.5 +manchester,16-20,100 +leeds,16-20,99.2 +``` + +HTTP 201 + +GET {{scheme}}://{{host_name}}/data/{{series}}/release/{{release}}/revision/1 +Accept: text/csv + +# Let's ensure the double '100' does not get turned into an integer + +HTTP 200 +[Asserts] +body matches /manchester,16-20,100.0/ diff --git a/datahost-ld-openapi/hurl-scripts/int-005/setup.ref b/datahost-ld-openapi/hurl-scripts/int-005/setup.ref new file mode 100644 index 00000000..7dc383ed --- /dev/null +++ b/datahost-ld-openapi/hurl-scripts/int-005/setup.ref @@ -0,0 +1 @@ +common/setup-area+age-range+estimate.hurl \ No newline at end of file diff --git a/datahost-ld-openapi/hurl-scripts/int-299.hurl b/datahost-ld-openapi/hurl-scripts/int-299.hurl new file mode 100644 index 00000000..d0c206b4 --- /dev/null +++ b/datahost-ld-openapi/hurl-scripts/int-299.hurl @@ -0,0 +1,130 @@ +# Description: test that creates two series with their own releases, revisions +# and a shared schema. Both contain a change with the same contents - these +# changes should have the same address within the store. The test deletes one of +# the series (and therefore its associated change) and checks that the contents +# of the change in the other series can still be fetched + +PUT {{scheme}}://{{host_name}}/data/{{series}}-1 +Accept: application/json +Content-Type: application/json +Authorization: {{auth_token}} +{ + "dcterms:title": "Series 1", + "dcterms:description": "A very simple test" +} + +HTTP 201 + +PUT {{scheme}}://{{host_name}}/data/{{series}}-2 +Accept: application/json +Content-Type: application/json +Authorization: {{auth_token}} +{ + "dcterms:title": "Series 2", + "dcterms:description": "A very simple test" +} + +HTTP 201 + +PUT {{scheme}}://{{host_name}}/data/{{series}}-1/release/release-1 +Accept: application/json +Content-Type: application/json +Authorization: {{auth_token}} +{ + "dcterms:title": "Test Release", + "dcterms:description": "A very simple Release" +} + +HTTP 201 + +PUT {{scheme}}://{{host_name}}/data/{{series}}-2/release/release-1 +Accept: application/json +Content-Type: application/json +Authorization: {{auth_token}} +{ + "dcterms:title": "Test Release", + "dcterms:description": "A very simple Release" +} + +HTTP 201 + +POST {{scheme}}://{{host_name}}/data/{{series}}-1/release/release-1/schema +Content-Type: application/json +Authorization: {{auth_token}} +file,common/schema-name-age.json; + +HTTP 201 + +POST {{scheme}}://{{host_name}}/data/{{series}}-2/release/release-1/schema +Content-Type: application/json +Authorization: {{auth_token}} +file,common/schema-name-age-2.json; + +HTTP 201 + +POST {{scheme}}://{{host_name}}/data/{{series}}-1/release/release-1/revisions +Accept: application/json +Content-Type: application/json +Authorization: {{auth_token}} +{ + "dcterms:title": "Rev 1", + "dcterms:description": "A test revision" +} + +HTTP 201 +[Captures] +revision1_url: header "Location" + +POST {{scheme}}://{{host_name}}/data/{{series}}-2/release/release-1/revisions +Accept: application/json +Content-Type: application/json +Authorization: {{auth_token}} +{ + "dcterms:title": "Rev 1", + "dcterms:description": "A test revision" +} + +HTTP 201 +[Captures] +revision2_url: header "Location" + +# confirm we can submit data +POST {{scheme}}://{{host_name}}{{revision1_url}}/appends +Content-Type: text/csv +Authorization: {{auth_token}} +[QueryStringParams] +title: changes appends +description: change for {{series}}-1 +file,common/name-age-01.csv; + +HTTP 201 + +# confirm we can submit data +POST {{scheme}}://{{host_name}}{{revision2_url}}/appends +Content-Type: text/csv +Authorization: {{auth_token}} +[QueryStringParams] +title: changes appends +description: change for {{series}}-2 +file,common/name-age-01.csv; + +HTTP 201 + +DELETE {{scheme}}://{{host_name}}/data/{{series}}-1 + +HTTP 204 + +# Verify data for the 'other' release is still there +GET {{scheme}}://{{host_name}}/data/{{series}}-2/release/release-1/revision/1/commit/1 +Accept: text/csv +Content-Type: application/json +Authorization: {{auth_token}} + +HTTP 200 +[Asserts] +``` +name,age +bob,25 +cindy,29 +max,33 +``` diff --git a/datahost-ld-openapi/hurl-scripts/issue-261.hurl b/datahost-ld-openapi/hurl-scripts/issue-261.hurl.disabled similarity index 100% rename from datahost-ld-openapi/hurl-scripts/issue-261.hurl rename to datahost-ld-openapi/hurl-scripts/issue-261.hurl.disabled diff --git a/datahost-ld-openapi/hurl-scripts/issue-282/issue-282.hurl b/datahost-ld-openapi/hurl-scripts/issue-282/issue-282.hurl new file mode 100644 index 00000000..b7f9b979 --- /dev/null +++ b/datahost-ld-openapi/hurl-scripts/issue-282/issue-282.hurl @@ -0,0 +1,17 @@ +# Description: We try to append a data with incorrectly formatted 'double' value, +# and that should be rejected by the API. + +# append with invalid double value + +POST {{scheme}}://{{host_name}}/data/{{series}}/release/{{release}}/revision/1/appends +Accept: application/json +Content-Type: text/csv +[QueryStringParams] +description: Add data for liverpool +format: text/csv +``` +area,age_range,estimate +manchester,16-20,invalid_double +``` + +HTTP 422 diff --git a/datahost-ld-openapi/hurl-scripts/issue-282/setup.ref b/datahost-ld-openapi/hurl-scripts/issue-282/setup.ref new file mode 100644 index 00000000..7dc383ed --- /dev/null +++ b/datahost-ld-openapi/hurl-scripts/issue-282/setup.ref @@ -0,0 +1 @@ +common/setup-area+age-range+estimate.hurl \ No newline at end of file diff --git a/datahost-ld-openapi/hurl-scripts/issue-299.hurl b/datahost-ld-openapi/hurl-scripts/issue-299.hurl index 7346af8e..e2b536d5 100644 --- a/datahost-ld-openapi/hurl-scripts/issue-299.hurl +++ b/datahost-ld-openapi/hurl-scripts/issue-299.hurl @@ -1,3 +1,6 @@ +# Description: Deleting a series with associated schema worked incorrectly and did not enrirely remove the schema. +# Future schemas could be polluted with the old shcemas. + #Create series PUT {{scheme}}://{{host_name}}/data/{{series}} Accept: application/json @@ -9,8 +12,6 @@ Authorization: {{auth_token}} } HTTP 201 -[Captures] -dataset: jsonpath "$['dh:baseEntity']" #Create release PUT {{scheme}}://{{host_name}}/data/{{series}}/release/release-1 @@ -24,7 +25,6 @@ Authorization: {{auth_token}} HTTP 201 - #Add schema to release POST {{scheme}}://{{host_name}}/data/{{series}}/release/release-1/schema Content-Type: application/json @@ -73,7 +73,8 @@ DELETE {{scheme}}://{{host_name}}/data/{{series}} HTTP 204 -#Create series +#Create series (with the same name as the one we just deleted) + PUT {{scheme}}://{{host_name}}/data/{{series}} Accept: application/json Content-Type: application/json @@ -84,9 +85,6 @@ Authorization: {{auth_token}} } HTTP 201 -[Captures] -dataset: jsonpath "$['dh:baseEntity']" - #Create release PUT {{scheme}}://{{host_name}}/data/{{series}}/release/release-1 diff --git a/datahost-ld-openapi/resources/ldapi/base-system.edn b/datahost-ld-openapi/resources/ldapi/base-system.edn index b74d1c95..1c46e758 100644 --- a/datahost-ld-openapi/resources/ldapi/base-system.edn +++ b/datahost-ld-openapi/resources/ldapi/base-system.edn @@ -2,6 +2,12 @@ :tpximpact.datahost.ldapi/drafter-base-uri "https://idp-beta-drafter.publishmydata.com/" :tpximpact.datahost.ldapi.jetty/http-port 3000 + :tpximpact.datahost.ldapi.store.sql/db-config {;;:spec "jdbc:h2:./tmp/h2.observationdb" + :spec "jdbc:sqlite:./tmp/sqlite.observationdb" + :dbtype "sqlite" + :user "SA" + :password ""} + ;; Base URI should end in a trailing slash :tpximpact.datahost.system-uris/uris {:rdf-base-uri #uri "https://example.org/data/"} @@ -17,18 +23,35 @@ :ide-path "/ld-api" :default-subscriptions-path "/ws"} + :tpximpact.datahost.ldapi.models.release/queries {} :tpximpact.datahost.ldapi.native-datastore/repo {:data-directory "./tmp/ld-dev-db"} :tpximpact.datahost.ldapi.store.file/store {:directory "./tmp/file-store"} - + :tpximpact.datahost.ldapi.store.sql/executor {:config {}} + :tpximpact.datahost.ldapi.store.sql/data-source {:db-config #ig/ref :tpximpact.datahost.ldapi.store.sql/db-config} + :tpximpact.datahost.ldapi.store.sql/store-factory + {:db-config #ig/ref :tpximpact.datahost.ldapi.store.sql/db-config + :data-source #ig/ref :tpximpact.datahost.ldapi.store.sql/data-source + :db-executor #ig/ref :tpximpact.datahost.ldapi.store.sql/executor} + :tpximpact.datahost.ldapi.router/handler {:triplestore #ig/ref :tpximpact.datahost.ldapi.native-datastore/repo :clock #ig/ref :tpximpact.datahost.time/system-clock :change-store #ig/ref :tpximpact.datahost.ldapi.store.file/store :system-uris #ig/ref :tpximpact.datahost.system-uris/uris - :base-path #ig/ref :tpximpact.datahost.ldapi/base-path} + :base-path #ig/ref :tpximpact.datahost.ldapi/base-path + :store-factory #ig/ref :tpximpact.datahost.ldapi.store.sql/store-factory + :db-config #ig/ref :tpximpact.datahost.ldapi.store.sql/db-config + :db-executor #ig/ref :tpximpact.datahost.ldapi.store.sql/executor + :data-source #ig/ref :tpximpact.datahost.ldapi.store.sql/data-source} :tpximpact.datahost.time/system-clock {} :tpximpact.datahost.ldapi.metrics/reporter {:enabled? false :period 20 :time-unit :seconds} + + ;; TODO: move the below into a 'dev.edn' config - it's a development convenience + ;; and should not be present in the deployed app. + :tpximpact.datahost.ldapi.models.release/common-tables + {:db {:db-config #ig/ref :tpximpact.datahost.ldapi.store.sql/db-config + :db-executor #ig/ref :tpximpact.datahost.ldapi.store.sql/executor}} } diff --git a/datahost-ld-openapi/src/tpximpact/datahost/ldapi.clj b/datahost-ld-openapi/src/tpximpact/datahost/ldapi.clj index 384cd13c..07a8b242 100644 --- a/datahost-ld-openapi/src/tpximpact/datahost/ldapi.clj +++ b/datahost-ld-openapi/src/tpximpact/datahost/ldapi.clj @@ -33,6 +33,8 @@ (derive :tpximpact.datahost.ldapi.native-datastore.repo/data-directory ::const) +(derive :tpximpact.datahost.ldapi.store.sql/db-config ::const) + (def system nil) (defn stop-system! [] diff --git a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/db.clj b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/db.clj index 0277aa73..cbca1051 100644 --- a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/db.clj +++ b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/db.clj @@ -631,7 +631,7 @@ [uri snapshot-key] [(pr/->Triple uri (compact/expand :dh/revisionSnapshotCSV) snapshot-key)]) -(defn tag-with-snapshot +(defn tag-with-snapshot ;TODO(rosado): think whether this is still needed. "Updates the entity under uri with dataset snapshot information. Assumes m contains keys: @@ -697,7 +697,7 @@ {:message STRING} when an insert could not be performed." [triplestore system-uris - {:keys [api-params ld-root store-key datahost.change/kind] + {:keys [api-params store-key datahost.change/kind] {rev-uri :dh/Revision} :datahost.request/uris}] {:pre [(some? kind) (some? store-key) (some? rev-uri)]} (time! @@ -708,6 +708,7 @@ change-uri (su/commit-uri* system-uris (assoc api-params :commit-id change-id)) _ (log/debug (format "will insert-change for '%s', new change id = %s" (.getPath ^URI rev-uri) change-id)) + ld-root (su/rdf-base-uri system-uris) change (request->change kind api-params ld-root rev-uri change-uri) change (resource/set-property1 change (compact/expand :dh/updates) store-key) {:keys [before after]} (do @@ -720,27 +721,11 @@ :inserted-jsonld-doc (resource/->json-ld change (output-context ["dh" "dcterms" "rdf"] ld-root))} {:message "Change already exists."}))))) -(defn- previous-change-coords - "Given revision and change id, tries to find the preceding change's - revision and change ids. If not possible, indicates an extra DB - lookup is needed. - - Returns a [:new :new] | [rev-id change-id] | [rev-id :find]." - [revision-id change-id] - {:pre [(pos? revision-id) (pos? change-id)]} - (cond - (and (= revision-id 1) (= 1 change-id)) [:new :new] - - (< 1 change-id) [revision-id (dec change-id)] - - ;; we need to find the last change-id in previous revision - :else [(dec revision-id) :find])) - (defn get-previous-change "Returns the previous change (as Quads), nil when no previous change exists." [triplestore system-uris {:keys [revision-id change-id] :as params}] (let [revision-uri ^URI (su/dataset-revision-uri* system-uris params) - [prev-rev-id c] (previous-change-coords revision-id change-id)] + [prev-rev-id c] (su/previous-commit-coords revision-id change-id)] (log/debug (format "get-previous-change: '%s'" (.getPath revision-uri)) [prev-rev-id c]) (cond (= :new c) ; the given revision+change is the first diff --git a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/errors.clj b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/errors.clj index dd4011fe..04df0395 100644 --- a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/errors.clj +++ b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/errors.clj @@ -16,6 +16,7 @@ (derive ::failure ::exception) (derive ::validation-failure ::exception) (derive ::not-found ::exception) +(derive ::input-data-error ::exception) (defn error-body-base [exception request] {:exception (.getClass exception) @@ -31,8 +32,8 @@ (defn validation-error-handler [exception-info request] {:status status/unprocessable-entity :body (assoc (error-body-base exception-info request) - :status "validation-error" - :message (ex-message exception-info))}) + :status "validation-error" + :message (str "Submitted data is invalid: " (ex-message exception-info)))}) (def exception-middleware (exception/create-exception-middleware @@ -46,6 +47,8 @@ ::validation-failure validation-error-handler + ::input-data-error validation-error-handler + ;; example of dispatch by class type ;java.lang.ArithmeticException (partial handler "Number Wang") diff --git a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/handlers.clj b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/handlers.clj index 84ef7b3e..250754e4 100644 --- a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/handlers.clj +++ b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/handlers.clj @@ -1,10 +1,13 @@ (ns tpximpact.datahost.ldapi.handlers (:require [clojure.tools.logging :as log] + [clojure.data.csv :as csv] + [clojure.string :as string] [grafter.matcha.alpha :as matcha] [ring.util.io :as ring-io] [tablecloth.api :as tc] [reitit.core :as rc] + [next.jdbc :as jdbc] [ring.util.request :as util.request] [ring.util.response :as util.response] [tpximpact.datahost.system-uris :as su] @@ -13,17 +16,23 @@ [tpximpact.datahost.ldapi.errors :as errors] [tpximpact.datahost.ldapi.store :as store] [tpximpact.datahost.ldapi.json-ld :as json-ld] - [tpximpact.datahost.ldapi.handlers.internal :as internal] [tpximpact.datahost.ldapi.resource :as resource] [tpximpact.datahost.ldapi.routes.shared :as shared] [tpximpact.datahost.ldapi.schemas.api :as s.api] + [tpximpact.datahost.ldapi.store.sql.interface :as sql.interface] + [tpximpact.datahost.ldapi.store.sql :as store.sql] [tpximpact.datahost.ldapi.util.data.validation :as data.validation] [tpximpact.datahost.ldapi.util.triples :refer [triples->ld-resource triples->ld-resource-collection]] + [tpximpact.datahost.ldapi.models.release :as m.release] [clojure.java.io :as io]) (:import (java.net URI) - (java.io ByteArrayInputStream ByteArrayOutputStream InputStream OutputStream))) + (java.io ByteArrayInputStream + ByteArrayOutputStream + BufferedWriter + InputStream + OutputStream))) (defn- box [x] (if (coll? x) x [x])) @@ -45,8 +54,61 @@ (with-open [in ^InputStream (store/get-data change-store data-key)] (.transferTo in out-stream))) +(defn- snapshot-maps-to-ring-io-writer + "Use as argument to `ring-io/piped-input-stream`" + [commit-uri observations ^OutputStream out-stream] + (try + (let [ks (keys (first observations)) + out (BufferedWriter. (java.io.OutputStreamWriter. out-stream)) + make-row (fn [record] (map #(get record %) ks))] + (csv/write-csv out [(map name ks)]) + (doseq [batch (partition-all 100 (map make-row observations))] + (csv/write-csv out batch)) + (.flush out)) + (catch Exception ex + (log/warn ex "Failure to write CSV response" commit-uri) + (throw ex)))) + +(defn- make-row-vec + [row] + (into [] (map #(str (nth row %))) (range (count row)))) + +(defn- snapshot-arrays-to-ring-io-writer + "Use as argument to `ring-io/piped-input-stream`" + [data-source executor commit-uri ostore temp-table ^OutputStream out-stream] + (try + (.get (sql.interface/submit + executor + (fn materialise [] + (with-open [conn (jdbc/get-connection data-source)] + (jdbc/with-transaction [tx conn] + (store.sql/replay-commits tx {:store ostore + :commit-uri commit-uri + :snapshot-table temp-table}) + (let [out ^BufferedWriter (BufferedWriter. (java.io.OutputStreamWriter. out-stream)) + rs (m.release/stream-materialized-snapshot tx {:snapshot-table temp-table} + {:store ostore})] + (doseq [s (interpose "," (:required-columns ostore))] + (.write out s)) + (.newLine out) + + (reduce (fn [_ row] + (let [v (make-row-vec row)] + (doseq [s (interpose "," v)] + (.write out s))) + (.newLine out)) + out + rs) + (.flush out))))))) + (catch Exception ex + (log/warn ex "Failure to write CSV response" commit-uri) + (throw ex)))) + +(defn commit-data-to-ring-io-writer + [data-source executor ]) + (defn csv-file-location->dataset [change-store key] - (with-open [in (store/get-data change-store key)] + (with-open [^java.io.Closeable in (store/get-data change-store key)] (data.validation/as-dataset in {:file-type :csv}))) (defn op->response-code @@ -74,13 +136,53 @@ (as-json-ld {:status (op->response-code op) :body jsonld-doc}))) -(defn delete-dataset-series [triplestore change-store system-uris {{:keys [series-slug]} :path-params :as request}] - (if-let [_series (db/get-dataset-series triplestore (su/dataset-series-uri system-uris series-slug))] +(defn- release-uris + "Returns a seq of URIs for a series." + [triplestore series-uri] + (let [issued-uri (cmp/expand :dcterms/issued) + has-schema-uri (cmp/expand :dh/hasSchema) + releases (->> (db/get-releases triplestore series-uri) + (matcha/index-triples) + (triples->ld-resource-collection) + (sort-by #(get % issued-uri)) + (reverse))] + (into #{} (comp (filter #(get % has-schema-uri)) + (map resource/id)) + releases))) + +(defn delete-dataset-series + [{:keys [triplestore change-store system-uris store-factory data-source db-executor]} + {{:keys [series-slug]} :path-params + {series-uri :dh/DatasetSeries} :datahost.request/uris + {_series :dh/DatasetSeries} :datahost.request/entities + :as _request}] + (let [release-uris (release-uris triplestore series-uri)] + (log/infof "delete-dataset-series: series-path=%s %s associated releases" + (.getPath ^URI series-uri) (count release-uris)) + ;; potentially delete the DB tables + (when (and data-source (seq release-uris)) + ;; TODO(rosado): replace `jdbc/get-connection` with a wrapper that accepts release-uri + (with-open [conn (jdbc/get-connection data-source)] + (try + (-> db-executor + (sql.interface/submit + (fn drop-release-tables [] + (doseq [uri release-uris + :let [params {:release-uri uri}]] + (log/debug {:delete-dataset-series/release-uri uri}) + (m.release/drop-release-tables-and-data conn params {})))) + (.get)) + (catch Throwable err + (log/debug {:delete-dataset-series/release-uris release-uris}) + (log/warn (ex-cause err) "delete-dataset-series: sql cleanup failed") + (throw err))) + (log/debugf "delete-dataset-series: series-path=%s release tables deleted" + (.getPath ^URI series-uri)))) + (let [orphaned-change-keys (db/delete-series! triplestore system-uris series-slug)] (doseq [change-key orphaned-change-keys] (store/-delete change-store change-key)) - {:status 204}) - (errors/not-found-response request))) + {:status 204}))) (defn get-release [triplestore _change-store system-uris @@ -120,16 +222,24 @@ :body "Series for this release does not exist"})) (defn post-release-schema - [clock triplestore system-uris {path-params :path-params - {schema-file :body} :parameters :as request}] + "Saves the schema in the triplestore and creates necessary tables in SQL db." + [{:keys [clock triplestore system-uris db-executor data-source] :as sys} + {{schema-file :body} :parameters + {release-uri :dh/Release} :datahost.request/uris + :as request}] {:pre [schema-file]} - (if (db/resource-exists? triplestore (su/dataset-series-uri* system-uris path-params)) - (let [incoming-jsonld-doc schema-file - api-params (get-api-params request)] - (as-> (db/upsert-release-schema! clock triplestore system-uris incoming-jsonld-doc api-params) insert-result - (as-json-ld {:status (op->response-code (:op insert-result)) - :body (:jsonld-doc insert-result)}))) - (errors/not-found-response request))) + (let [incoming-jsonld-doc schema-file + api-params (get-api-params request) + insert-result (db/upsert-release-schema! clock triplestore system-uris incoming-jsonld-doc api-params) + row-schema (data.validation/make-row-schema-from-json incoming-jsonld-doc)] + (when data-source + (with-open [conn (jdbc/get-connection data-source)] + (-> db-executor + (sql.interface/submit + #(m.release/create-release-tables conn {:release-uri release-uri :row-schema row-schema} {})) + (.get)))) + (as-json-ld {:status (op->response-code (:op insert-result)) + :body (:jsonld-doc insert-result)}))) (defn- get-schema-id [matcha-db] ((grafter.matcha.alpha/select-1 [?schema] @@ -159,15 +269,18 @@ [rev-id] (let [path (.getPath ^URI rev-id)] (try - (Long/parseLong (-> (re-find #"^.*/([^/]*)$" path) next first)) + (Long/parseLong (-> (re-find #"^.*\/revision\/(\d+)\/{0,1}.*$" path) next first)) (catch NumberFormatException ex (throw (ex-info (format "Could not extract revision number from given id: %s" rev-id) {:revision-id rev-id} ex)))))) (defn get-revision - [triplestore - change-store - system-uris + [{:keys [triplestore + system-uris + store-factory + data-source + db-executor] + :as _system} {{revision :dh/Revision} :datahost.request/entities {release-uri :dh/Release} :datahost.request/uris {:strs [accept]} :headers @@ -177,14 +290,23 @@ (-> {:status 200 :headers {"content-type" "text/csv" "content-disposition" "attachment ; filename=revision.csv"} - :body (let [change-infos (db/get-changes-info triplestore release-uri (revision-number (resource/id revision-ld)))] - (if (empty? change-infos) - "" ;TODO: return table header here, need to get schema first`` - (let [key (:snapshotKey (last change-infos))] - (assert (string? key)) - (when (nil? key) - (throw (ex-info "No snapshot reference for revision" {:revision revision}))) - (ring-io/piped-input-stream (partial change-store-to-ring-io-writer change-store key)))))} + :body (let [release-schema (db/get-release-schema triplestore release-uri) + change-infos (db/get-changes-info triplestore release-uri (revision-number (resource/id revision-ld)))] + (if (empty? change-infos) + "" ;TODO: return table header here, need to get schema first`` + ;; ELSE + (let [ostore (store-factory release-uri (data.validation/make-row-schema release-schema)) + commit-uri (-> change-infos last :snapshotKey (URI.)) + ;; TODO: one temp table per request - this obviously can cause disk usage + ;; to grow significantly if not capped. + temp-table (str "test_" (random-uuid))] + (ring-io/piped-input-stream + (partial snapshot-arrays-to-ring-io-writer + data-source + db-executor + commit-uri + ostore + temp-table)))))} (shared/set-csvm-header request)) (as-json-ld {:status 200 @@ -256,118 +378,100 @@ {:status 422 :body "Release for this revision does not exist"}))) -(defn- validate-incoming-change-data - "Returns a map {:dataset ?DATASET (optional-key :error-response) ..., row-schema MALLI-SCHEMA}, - containing :error-response entry when validation failed." - [release-schema appends] - (let [row-schema (data.validation/make-row-schema release-schema) - {:keys [explanation dataset]} - (try - (->(data.validation/as-dataset appends {:enforce-schema row-schema}) - (data.validation/validate-dataset row-schema {:fail-fast? true})) - (catch clojure.lang.ExceptionInfo ex - (if (= ::data.validation/dataset-creation (-> ex ex-data :type)) - {:explanation (ex-message ex)} - (throw ex))))] - (cond-> {:row-schema row-schema} - (some? dataset) (assoc :dataset dataset) - (some? explanation) (assoc :error-response - {:status 400 - :body {:message "Invalid data" - :explanation explanation - :column-names (data.validation/row-schema->column-names row-schema)}})))) - (defn ->byte-array-input-stream [input-stream] (with-open [intermediate (ByteArrayOutputStream.)] (io/copy input-stream intermediate) (ByteArrayInputStream. (.toByteArray intermediate)))) +(defn ->tmp-file + [^InputStream input-stream] + (let [tmp (java.io.File/createTempFile "inputcsv" ".tmp")] + (with-open [out (java.io.FileOutputStream. tmp)] + (io/copy input-stream out)) + tmp)) + (defn post-change - [triplestore - change-store - system-uris + [{:keys [triplestore system-uris data-source store-factory] :as sys} change-kind {router :reitit.core/router - {:keys [series-slug release-slug revision-id] :as path-params} :path-params - query-params :query-params + {:keys [series-slug release-slug revision-id]} :path-params + _query-params :query-params appends :body - {release-uri :dh/Release :as request-uris} :datahost.request/uris + {:keys [release-schema]} :datahost.request/entities + {^URI release-uri :dh/Release :as request-uris} :datahost.request/uris :as request}] - (let [;; validate incoming data - release-schema (db/get-release-schema triplestore release-uri) - ;; If there's no release-schema, then no validation happens, and db/insert - ;; is a NOOP. This fails further down in the `let` body in - ;; #'internal/post-change--generate-csv-snapshot - ;; We don't need to proceed further if there's no release-schema! - _ (when (nil? release-schema) - (throw (ex-info (str "No release schema found for: " release-uri) - {:release-uri release-uri}))) - appends ^InputStream (->byte-array-input-stream appends) - insert-req (store/make-insert-request! change-store appends) - {validation-err :error-response - row-schema :row-schema - change-ds :dataset} (some-> release-schema (validate-incoming-change-data appends)) - _ (.reset appends) + (let [^java.io.File appends-file (with-open [^java.io.Closeable a appends] + (->tmp-file a)) + insert-req (store.sql/make-observation-insert-request! release-uri appends-file change-kind) content-type (util.request/content-type request) ;; insert relevant triples {:keys [inserted-jsonld-doc change-id - change-uri - message]} (when-not validation-err - (db/insert-change! triplestore - system-uris - {:api-params (assoc (get-api-params request) - :format content-type) - :ld-root (su/rdf-base-uri system-uris) - :store-key (:key insert-req) - :datahost.change/kind change-kind - :datahost.request/uris request-uris}))] - (log/info (format "post-change: '%s' validation: found-schema? = %s, change-valid? = %s, insert-ok? = %s" - change-uri (some? release-schema) (nil? validation-err) (nil? message))) - (cond - (some? validation-err) validation-err - - (some? message) {:status 422 :body message} - - :else - (do - ;; store the change - (store/request-data-insert change-store insert-req) - (log/debug (format "post-change: '%s' stored-change: '%s'" (.getPath change-uri) (:key insert-req))) - - ;; generate and store the dataset snapshot - (let [{:keys [new-snapshot-key] :as snapshot-result} - (internal/post-change--generate-csv-snapshot - {:triplestore triplestore - :change-store change-store - :system-uris system-uris} - {:path-params path-params - :change-kind change-kind - :change-id change-id - :dataset change-ds - :row-schema row-schema - "dcterms:format" content-type})] - (log/debug (format "post-change: '%s' stored snapshot" (.getPath change-uri)) - {:new-snapshot-key new-snapshot-key}) - (db/tag-with-snapshot triplestore change-uri snapshot-result)) - - (as-json-ld {:status 201 - :headers {"Location" (-> (rc/match-by-name router - :tpximpact.datahost.ldapi.router/commit - {:series-slug series-slug - :release-slug release-slug - :revision-id revision-id - :commit-id change-id}) - (rc/match->path))} - :body inserted-jsonld-doc}))))) + ^URI change-uri + message]} (db/insert-change! triplestore + system-uris + {:api-params (assoc (get-api-params request) + :format content-type) + :store-key (:key insert-req) + :datahost.change/kind change-kind + :datahost.request/uris request-uris})] + (log/info (format "post-change: '%s' insert-ok? = %s" change-uri (nil? message))) + (try + (cond + (some? message) {:status 422 :body message} + + :else + (do + ;; store the change + (log/debug (format "post-change: '%s' stored-change: '%s'" (.getPath change-uri) (:key insert-req))) + (let [row-schema (data.validation/make-row-schema release-schema)] + (when (and data-source store-factory) + (try + (jdbc/with-transaction [tx (jdbc/get-connection data-source)] + (let [obs-store (store-factory release-uri row-schema) + tx-store (assoc obs-store :db tx) + insert-req (assoc insert-req :commit-uri change-uri) + {import-status :status} (store.sql/execute-insert-request tx-store insert-req)] + (log/debug "import-status: " import-status) + ;; TODO(rosado): complete-import should probably take :status + + (when (= :import.status/success import-status) + (store.sql/complete-import tx-store insert-req)) + + (if (store.sql/create-commit? import-status) + (store.sql/create-commit tx-store insert-req) + (throw (ex-info (format "Could not create commit: %s" change-uri) + {:commit-uri change-uri :import-status import-status}))))) + (catch Exception ex + ;; TODO(rosado): handle error, tag the change entity as failed/error in the triplestore (or delete it) + (throw ex)))) + (db/tag-with-snapshot triplestore change-uri {:new-snapshot-key (str change-uri)}) + + (as-json-ld {:status 201 + :headers {"Location" (-> (rc/match-by-name router + :tpximpact.datahost.ldapi.router/commit + {:series-slug series-slug + :release-slug release-slug + :revision-id revision-id + :commit-id change-id}) + (rc/match->path))} + :body inserted-jsonld-doc})))) + (finally + (.delete appends-file))))) (defn change->csv-stream [change-store change] (let [appends (get change (cmp/expand :dh/updates))] + (tap> {:change change}) (when-let [dataset (csv-file-location->dataset change-store appends)] (write-dataset-to-outputstream dataset)))) -(defn get-change [triplestore change-store system-uris {{:keys [series-slug release-slug revision-id commit-id]} :path-params - {:strs [accept]} :headers :as request}] +(defn get-change + "Returns data for commit (and not the snapshot dataset)." + [triplestore + change-store + system-uris + {{:keys [series-slug release-slug revision-id commit-id]} :path-params + {:strs [accept]} :headers :as request}] (if-let [change (->> (su/commit-uri system-uris series-slug release-slug revision-id commit-id) (db/get-change triplestore) matcha/index-triples diff --git a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/json_ld.clj b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/json_ld.clj index cb767c84..ba1a6737 100644 --- a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/json_ld.clj +++ b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/json_ld.clj @@ -32,9 +32,9 @@ "Takes an EDN JSON-LD data structure and converts them to a compacted jakarta.json.JsonObject, which can be further manipulated or written to a string via .toString" - [json-ld context] - (-> (->json-document json-ld) - (JsonLd/compact (->json-document context)) - (.compactArrays true) - (.compactToRelative true) - (.get))) + (^jakarta.json.JsonObject [json-ld context] + (-> (->json-document json-ld) + (JsonLd/compact (->json-document context)) + (.compactArrays true) + (.compactToRelative true) + (.get)))) diff --git a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/metrics.clj b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/metrics.clj index cfc1b470..bca8f71d 100644 --- a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/metrics.clj +++ b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/metrics.clj @@ -39,6 +39,8 @@ (deftimer reg ["db" "read" "get-changes-info"]) (deftimer reg ["db" "write" "delete-series!"]) +(deftimer reg ["import" "write" "insert-observations"]) + ;; 77 get-dataset-series - read ;; 112 get-release-schema-statements - read ;; 133 get-revision - read diff --git a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/models/release.clj b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/models/release.clj new file mode 100644 index 00000000..72420201 --- /dev/null +++ b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/models/release.clj @@ -0,0 +1,278 @@ +(ns tpximpact.datahost.ldapi.models.release + "Functions in this namespace are a mix of autogenerated (via HugSQL) + and hand-written utility functions (mostly to hide tedious params + wrangling). + + Note: Avoid putting complex logic here. Dealing with errors, threads + pools etc belongs somewhere else. + + ## Conventions + + We're using the convetion of aliasing the name of the dynamically selected + table (e.g. the relase's observations table) as 'selected' in our SQL code." + (:require [clojure.tools.logging :as log] + [malli.core :as m] + [malli.error :as m.error] + [integrant.core :as ig] + [hugsql.core :as hugsql] + [hugsql.adapter.next-jdbc :as next-adapter] + [hugsql.parameters :refer [identifier-param-quote]] + [next.jdbc :as jdbc] + [tpximpact.datahost.ldapi.schemas.common :as s.common] + [tpximpact.datahost.ldapi.store.sql.interface :as sql.interface] + [tpximpact.datahost.ldapi.util.name-munging :as name-munging])) + +(def ^:private adapter (next-adapter/hugsql-adapter-next-jdbc)) +(def ^:private sql-file "tpximpact/datahost/ldapi/models/sql/release.sql") + +(defn- setup-queries! [] + (hugsql/def-db-fns sql-file {:adapter adapter :quoting :ansi}) + (hugsql/def-sqlvec-fns sql-file {:adapter adapter :quoting :ansi})) + +(setup-queries!) + +(defn- release-uri->release-table [release-uri prefix] (->> release-uri name-munging/sanitize-name (str prefix))) + +(defn- create-table* + [db f {:keys [row-schema] :as params} options] + (let [{:keys [original-column-names col-spec]} (name-munging/observation-column-defs-sql params) + _ (assert (pos? (count col-spec))) + col-spec (mapv (fn [tup] {:column (nth tup 1) :spec (nth tup 2)}) col-spec) + col-spec (assoc-in col-spec [(dec (count col-spec)) :comma?] false)] + (f db (assoc params :column-spec (map column-spec col-spec)) options))) + +(defn create-observations-table + ([db params] (create-observations-table db params {:quoting :ansi})) + ([db {:keys [row-schema] :as params} options] + (create-table* db -create-observations-table params options))) + +(defn create-observations-import-table + [db {:keys [row-schema] :as params} options] + (create-table* db -create-observations-import-table params options)) + +(defn create-common-tables + [^java.sql.Connection db] + (create-imports-table db) + (create-commits-table db) + (create-commits-by-uid-index db) + (create-commits-by-import-uid-index db) + (create-commits-by-revision-id-index db) + (create-commits-by-change-id-index db)) + +(defn create-release-tables + "Creates tables for the release specified by URI and row-schema." + [db {:keys [row-schema release-uri] :as params} options] + (assert (not (:release-id params))) + (let [params (assoc params :release_id (name-munging/sanitize-name release-uri))] + (create-observations-import-table db params options) + (create-observations-table db params options))) + +(def ImportObservationParams + (m/schema [:map + [:release_id :string] + [:import_id :int] + [:column-names [:sequential :any]] + [:observations [:sequential :any]]])) + +(defn import-observations + "Adds observations to the 'import' table. + + See: [[Importobservationparams]] " + [db params] + (assert (m/validate ImportObservationParams params)) + (-import-observations db params)) + +(defn- sanitize-params + [{:keys [] :as params} {{:keys [release-uri] :as store} :store :as options}] + (cond-> params + store (assoc :select-columns (sql.interface/-make-select-observations-as store "selected")))) + +(def SelectObservationsArgs + (m/schema [:tuple + [:fn {:error/message "DB connection is required"} some?] + [:map + ;;[:release-uri :datahost/uri] + [:import-uid :any]] + [:map + [:store {:optional false} + [:and + [:fn #(satisfies? sql.interface/SQLStoreCompatible %)] + [:map + [:release-uri :datahost/uri]]]]]] + {:registry s.common/registry})) + +(def ^:private select-observation-args-valid? (m/validator SelectObservationsArgs)) + +(defn- validate-select-observations-args + [db+params+opts] + (when-not (select-observation-args-valid? db+params+opts) + (throw (ex-info "invalid arguments" {:explanation (->> db+params+opts + (m/explain SelectObservationsArgs) + (m.error/humanize))})))) + +(defn select-imported-observations + [db {:keys [import-uid] :as params} {{:keys [release-uri]} :store :as options}] + (validate-select-observations-args [db params options]) + (let [imports-table (release-uri->release-table release-uri "import::") + params (assoc params + :imports-table imports-table + :import-id (import-id (assoc params :release-uri release-uri)))] + (-select-imported-observations db (sanitize-params params options)))) + +(defn select-observations + [db {:keys [] :as params} {{:keys [release-uri]} :store :as options}] + (validate-select-observations-args [db params options]) + (let [observations-table (release-uri->release-table release-uri "observations::") + params (assoc params + :observations-table observations-table + :import-id (import-id (assoc params :release-uri release-uri)))] + (-select-observations db (sanitize-params params options) options))) + +(defn complete-import--copy-records + [db {:keys [import-uid] :as params} {{:keys [release-uri] :as store} :store :as options}] + (let [observations-table (release-uri->release-table release-uri "observations::") + imports-table (release-uri->release-table release-uri "import::") + params (assoc params + :observations-table observations-table + :imports-table imports-table + :select-columns (sql.interface/-make-select-observations-as store nil) + :import-id (import-id {:release-uri release-uri :import-uid import-uid}))] + (-complete-import--copy-records db params options))) + +(defn complete-import--delete-import-records + [db {:keys [import-uid] :as params} {{release-uri :release-uri :as store} :store :as options}] + (let [params (assoc params :imports-table (release-uri->release-table release-uri "import::")) + params (assoc params :import-id (import-id (assoc params :release-uri release-uri)))] + (-complete-import--delete-import-records db params options))) + +(defn select-commit-observations + "Get observations belonging to a commit specified by change-uri. + + Params: + - change-uri + - snapshot-data? - optional, defaults to false. Selects only the + columns needed building a snaphost of the dataset." + [db {:keys [change-uri snapshot-data?] :as params + :or {snapshot-data? false}} + {:keys [store] :as options}] + (assert store) + (let [release-uri (:release-uri store) + observations-table (release-uri->release-table release-uri "observations::") + params (assoc params :release-uri release-uri :observations-table observations-table) + params (assoc params :-commit-info (-commit-info params options)) + params (cond-> params + (not snapshot-data?) (sanitize-params options) + snapshot-data? (assoc :select-columns [["id" "id"] + ["synth_id" "synth_id"] + ["coords" "coords"]]))] + (-select-commit-observations db params options))) + +(defn- commit-op-params + [{:keys [commit-uri snapshot-table] :as params} {:keys [store] :as options}] + (let [release-uri (:release-uri store) + params (assoc params :release-uri release-uri) + params (assoc params :-commit-info (-commit-info params options)) + params (assoc params :observations-table (release-uri->release-table release-uri "observations::"))] + params)) + +(defn select-commit-observations-into + [db {:keys [commit-uri snapshot-table] :as params} {:keys [store] :as options}] + (assert snapshot-table) + (let [params (commit-op-params params options)] + (-select-observation-snapshot-data db params options))) + +(defn commit-op-append + [db {:keys [commit-uri snapshot-table] :as params} {:keys [store] :as options}] + (assert snapshot-table) + (let [params (commit-op-params params options)] + (-commit-op-append db params options))) + +(defn commit-op-retract + [db {:keys [commit-uri snapshot-table] :as params} {:keys [store] :as options}] + (assert snapshot-table) + (let [params (commit-op-params params options)] + (-commit-op-retract db params options))) + +(defn commit-op-correct* + "Note: only performs the 2nd step of 'correction': appending the updated rows + to a snapshot table that already had the updated rows removed." + [db {:keys [commit-uri snapshot-table] :as params} {:keys [store] :as options}] + (assert snapshot-table) + (let [params (commit-op-params params options)] + (-commit-op-correct* db params options))) + +(defn populate-corrections-scratch-table + [db {:keys [commit-uri snapshot-table table-name] :as params} {:keys [store] :as options}] + (assert snapshot-table) + (let [params (commit-op-params params options)] + (-populate-corrections-scratch-table db params options))) + +(defn commit-op-correct--insert-updated-records + [db {:keys [commit-uri snapshot-table] :as params} {:keys [store] :as options}] + (assert snapshot-table) + (let [params (commit-op-params params options)] + (-commit-op-correct--insert-updated-records db params options))) + +(defn get-commit-ids + "Returns :id, :parent_id, :op, :uid" + [db {:keys [commit-uri release-uri] :as params}] + (assert release-uri) + (assert commit-uri) + (-get-commit-ids db (assoc params :all-commit-ids (all-commit-ids {:uid commit-uri :release-uri release-uri})))) + +(defn- materialize-snapshot-params+options + "Returns a tuple of [params options]" + [ {:keys [snapshots-table] :as params} {:keys [store] :as options}] + (let [{:keys [release-uri]} store + params (assoc params + :select-columns (sql.interface/-make-select-observations-as store "selected") + :observations-table (release-uri->release-table release-uri "observations::")) + options (cond-> options + (not (contains? options :builder-fn)) + (assoc :builder-fn next.jdbc.result-set/as-unqualified-arrays))] + [params options])) + +(defn materialize-snapshot + [db {:keys [snapshots-table] :as params} {:keys [store] :as options}] + (let [[params options] (materialize-snapshot-params+options params options)] + (jdbc/execute! db (-materialise-snapshot-sqlvec params options) options))) + +(defn stream-materialized-snapshot + [db {:keys [snapshots-table] :as params} {:keys [store] :as options}] + (let [[params options] (materialize-snapshot-params+options params options)] + (jdbc/plan db (-materialise-snapshot-sqlvec params options) options))) + +(defn drop-release-tables-and-data + [db {:keys [release-uri] :as params} options] + (let [imp-table (release-uri->release-table release-uri "import::") + obs-table (release-uri->release-table release-uri "observations::")] + (-drop-table db (assoc params :table-name imp-table) options) + (-drop-table db (assoc params :table-name obs-table) options) + (jdbc/execute-one! db ["delete from imports where release_uri = ? " release-uri]) + (jdbc/execute-one! db ["delete from commits where release_uri = ?" release-uri]))) + +(defmethod ig/init-key ::queries [_ _] + (setup-queries!)) + +(defmethod ig/halt-key! ::queries [_ _]) + +(defn db-ok? + [conn] + (try + {:result (jdbc/execute-one! conn ["select count(*) from imports"])} ;TODO: make this work on other DBs + true + (catch java.sql.SQLException _ex + false))) + +;;; 'data-source' is optional + +(defmethod ig/init-key ::common-tables [_ {{:keys [db-executor db-config data-source]} :db}] + (let [db-executor ^java.util.concurrent.ExecutorService db-executor + {:keys [spec user password]} db-config + connection (if data-source + (jdbc/get-connection data-source) + (jdbc/get-connection spec user password))] + (when-not (db-ok? connection) + (.get (sql.interface/submit db-executor #(create-common-tables connection)))))) + +(defmethod ig/halt-key! ::common-tables [_ _]) diff --git a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/models/sql/release.sql b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/models/sql/release.sql new file mode 100644 index 00000000..53653b1b --- /dev/null +++ b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/models/sql/release.sql @@ -0,0 +1,315 @@ +-- :snip column-spec +:i:column :sql:spec --~(when (:comma? params true) ",") + +-- :snip import-id +(select id from imports where import_uid = :import-uid and release_uri = :release-uri) + +-- :snip all-commit-ids +with recursive all_commits as ( + select id, parent_id, op, uid + from commits + where id = (select id from commits where uid = :uid) + union + select c.id, c.parent_id, c.op, c.uid + from commits c + inner join all_commits ac on c.id = ac.parent_id + where c.release_uri = :release-uri +) + +-- :snip -commit-info +-- :doc get joined import and commit data for :change-uri. Returns table of [import_id, commits.op] +( + select i.id as import_id, c.op + from imports i + inner join commits c + on i.import_uid = c.import_uid + where c.release_uri = :release-uri + and i.release_uri = :release-uri + and +--~(if-let [commit-id (:commit-id params)] "c.id = :commit-id" "c.uid = :commit-uri") +) + +-- :name -create-observations-table :! :1 +/* :require [hugsql.parameters :refer [identifier-param-quote]] */ +create table +--~(identifier-param-quote (str "observations::" (:release_id params)) options) +( + id integer primary key, + import_id integer not null, + coords integer not null, + synth_id integer not null, + :snip*:column-spec, + unique (import_id,coords) +) + +-- :name -create-observations-import-table :! :1 +create table +--~(identifier-param-quote (str "import::" (:release_id params)) options) +( + --id integer generated always as identity primary key, -- H2 + id integer primary key autoincrement, --sqlite + import_id integer not null, + coords integer not null, + synth_id integer not null, + :snip*:column-spec, + unique (import_id,coords) +) + +-- :name create-temporary-snapshot-table :! :1 +-- :doc Creates a table with [id, import_id, coords, synth_id] columns +create temp table :i:table-name +-- ---(identifier-param-quote (str (:table-name params)) options) +( + -- we no longer want to generate the id, we will supply it + id integer primary key not null, + import_id integer not null, + coords integer not null, + synth_id integer not null, + new_id integer, + unique (import_id,coords) +) + +-- :name create-imports-table :! :1 +create table if not exists imports +( +-- id integer generated always as identity primary key,-- H2 + id integer primary key autoincrement, -- sqlite + import_uid varchar(128) not null, + -- status: 'created' | 'failed' | 'completed' + status varchar(128) not null, + created timestamp not null, + updated timestamp, + release_uri not null +) + +-- :name -insert-import :! :1 +with import_ids as :snip:import-id +insert into imports (created, status, import_uid, release_uri) +select CURRENT_TIMESTAMP, 'started', :import-uid, :release-uri +where not exists (select 1 from import_ids) +--(select 1 from imports +-- where import_uid = :import-uid and release_uri = :release-uri) + +-- :name select-import :! :1 +select * from imports +where import_uid = :import-uid and release_uri = +--~(format "'%s'" (:release-uri (:store options))) + + +-- :name update-import-status !: :1 +with import_ids as :snip:import-id +update imports +set status = :status, updated = CURRENT_TIMESTAMP +where id = (select id from import_ids) +and status != 'completed' + +-- :name create-commits-table :! :1 +create table if not exists commits ( + -- id integer generated always as identity primary key, -- H2 + id integer primary key autoincrement, + uid text not null, + import_uid text not null, -- TODO: change to FK? + revision_id integer not null, + change_id integer not null, -- 'commit-id in triplestore' + parent_id integer, + op integer not null, + release_uri not null, -- TODO: revise, column probably not needed + unique (uid) +); + +-- :name create-commits-by-uid-index :! :1 +create index if not exists commits_by_uid on commits (uid); + +-- :name create-commits-by-import-uid-index :! :1 +create index if not exists commits_by_import_uid on commits (import_uid); + +-- :name create-commits-by-revision-id-index :! :1 +create index if not exists commits_by_revision_id on commits (revision_id); + +-- :name create-commits-by-change-id-index :! :1 +create index if not exists commits_by_change_id on commits (change_id); + +-- :name insert-commit :! :1 +-- :doc Inserts a commit record into given release +with parent_commits as ( + select id, revision_id, change_id + from commits + where release_uri = :release-uri + and ((revision_id = :revision-id and change_id = :change-id - 1) + or + (revision_id < :revision-id)) + -- probably can add a guard against import_uid && op (same change twice in a row) + order by revision_id desc, change_id desc + limit 1 +) +insert into commits (uid, import_uid, parent_id, revision_id, change_id, op, release_uri) +select + :uid, + :import_uid, + (select id from parent_commits), + :revision-id, + :change-id, + :op, + :release-uri +where not exists (select 1 from commits + where + revision_id = :revision-id + and change_id = :change-id + and uid = :uid + limit 1) + +-- :name -import-observations :! :1 +insert into +--~(identifier-param-quote (str "import::" (:release_id params)) options) +(import_id, coords, synth_id, :i*:column-names) +values :t*:observations + +-- :name -select-imported-observations :? :* +with import_ids as :snip:import-id +select :i*:select-columns +from :i:imports-table as selected +where selected.import_id = (select id from import_ids) + +-- :name -select-observations :? :* +with import_ids as :snip:import-id +select :i*:select-columns +from :i:observations-table as selected +where selected.import_id = (select id from import_ids) + +-- :name -complete-import--copy-records :! :n +with import_id as :snip:import-id +insert into :i:observations-table (import_id, coords, synth_id, :i*:insert-columns) +select (select id from import_id), coords, synth_id, :i*:select-columns +from :i:imports-table + +-- :name -complete-import--delete-import-records +with import_ids as :snip:import-id +delete from :i:imports-table where import_id = (select id from import_ids) + +-- :name -get-commit-ids :? :* +-- :doc returns {:id int, parent_id int, :op :int} +:snip:all-commit-ids +select id, parent_id, op, uid from all_commits + +-- :name -commit-op-append :? :* +-- :doc TODO +with commit_info as :snip:-commit-info, +duplicates as ( + select + o.id as dup_id, + o.coords as dup_coords + from :i:observations-table o + inner join :i:snapshot-table s + on o.coords = s.coords + where o.import_id != s.import_id +), +valid_ids as ( + select id + from :i:observations-table + where import_id = (select import_id from commit_info ) + except + select dup_id from duplicates +) +insert into :i:snapshot-table (id, import_id, coords, synth_id) +select o.id, (select import_id from commit_info) as import_id, o.coords, o.synth_id +from :i:observations-table o +inner join valid_ids ids +on ids.id = o.id + +-- :name -commit-op-retract :? :* +-- :doc TODO +with commit_info as :snip:-commit-info, +coords_to_remove as ( + select coords, synth_id + from :i:snapshot-table s + intersect + select coords, synth_id + from :i:observations-table o + where o.import_id = (select import_id from commit_info) +) +delete from :i:snapshot-table +where coords in (select coords from coords_to_remove) + +-- :name -commit-op-correct* +-- :doc TODO +with commit_info as :snip:-commit-info, +measures_to_update as ( + select o.id, o.coords, o.synth_id + from :i:observations-table o + inner join :i:snapshot-table s + on s.coords = o.coords + where o.import_id = (select import_id from commit_info) + and s.synth_id != o.synth_id +) +update :i:snapshot-table +set new_id = (select id from measures_to_update m where m.coords = coords) + +-- :name -create-corrections-scratch-table +-- :doc TODO +create temp table :i:table-name ( + old_id integer not null, + new_id integer not null, + unique (old_id, new_id) +) + +-- :name -populate-corrections-scratch-table +-- :doc TODO +with commit_info as :snip:-commit-info, +measures_to_update as ( + select s.id as old_id, o.id as new_id, o.coords, o.synth_id + from :i:observations-table o + inner join :i:snapshot-table s + on s.coords = o.coords + where o.import_id = (select import_id from commit_info) + and s.synth_id != o.synth_id +) +insert into :i:table-name (old_id, new_id) +select old_id, new_id +from measures_to_update + +-- :name -commit-op-correct--delete-stale-records +-- :doc TODO +delete from :i:snapshot-table +where id in (select old_id from :i:table-name) + +-- :name -commit-op-correct--insert-updated-records +-- :doc TODO +with commit_info as :snip:-commit-info +insert into :i:snapshot-table (id, import_id, coords, synth_id) +select scratch.new_id, o.import_id, o.coords, o.synth_id +from :i:table-name scratch +inner join :i:observations-table o +on o.id = scratch.new_id +where o.import_id = (select import_id from commit_info) + +-- :name -select-commit-observations :? :* +-- :doc Get observations belonging to a commit specified by :commit-uri +with commit_info as :snip:-commit-info, -- gives us: [import_id, commits.op] +selected as ( + select :i*:select-columns + from :i:observations-table as selected + where import_id = (select import_id from commit_info) +) +select * from selected +join commit_info c +on import_id = c.import_id + +-- :name -select-observation-snapshot-data +-- :doc needs: :snapshot-table, :release-uri, :commit-uri, and snippets: :-commit-info +with commit_info as :snip:-commit-info +insert into :i:snapshot-table (id, import_id, coords, synth_id) +select id, import_id, coords, synth_id +from :i:observations-table +where import_id = (select import_id from commit_info) + +-- :name -materialise-snapshot +-- :doc TODO +select :i*:select-columns -- X as selected.X +from :i:snapshot-table s +inner join :i:observations-table selected +on s.id = selected.id +order by s.id asc + +--:name -drop-table +drop table :i:table-name + diff --git a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/router.clj b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/router.clj index f02adf9d..fb6fe10e 100644 --- a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/router.clj +++ b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/router.clj @@ -264,7 +264,7 @@ In Datahost it's possible for some resources to only have metadata or data representations available. This should be documented in the API specifications for each route.") -(defn router [{:keys [clock triplestore change-store auth system-uris]}] +(defn router [{:keys [clock triplestore change-store auth system-uris] :as system}] (ring/router [["/openapi.json" {:get {:no-doc true @@ -300,7 +300,7 @@ specifications for each route.") ["/:series-slug" {:get (routes.s/get-series-route-config triplestore system-uris) :put (routes.s/put-series-route-config clock triplestore system-uris) - :delete (routes.s/delete-series-route-config triplestore change-store system-uris)}] + :delete (routes.s/delete-series-route-config system)}] ["/:series-slug/releases" {:get (routes.rel/get-release-list-route-config triplestore system-uris)}] @@ -319,7 +319,7 @@ specifications for each route.") ["/{release-slug}/schema" {:get (routes.rel/get-release-ld-schema-config triplestore system-uris) - :post (routes.rel/post-release-ld-schema-config clock triplestore system-uris)}] + :post (routes.rel/post-release-ld-schema-config system)}] ["/{release-slug}/revisions" {:post (routes.rev/post-revision-route-config triplestore system-uris) @@ -328,7 +328,7 @@ specifications for each route.") ["/:release-slug/revision" ["/:revision-id" {:name ::revision - :get (routes.rev/get-revision-route-config triplestore change-store system-uris) + :get (routes.rev/get-revision-route-config system) :post (routes.rev/post-revision-delta-config {:triplestore triplestore :change-store change-store :clock clock @@ -339,13 +339,13 @@ specifications for each route.") :get (routes.rev/get-revision-commit-route-config triplestore change-store system-uris)}] ["/:revision-id/appends" - {:post (routes.rev/post-revision-appends-route-config triplestore change-store system-uris)}] + {:post (routes.rev/post-revision-appends-route-config system)}] ["/:revision-id/retractions" - {:post (routes.rev/post-revision-retractions-route-config triplestore change-store system-uris)}] + {:post (routes.rev/post-revision-retractions-route-config system)}] ["/:revision-id/corrections" - {:post (routes.rev/post-revision-corrections-route-config triplestore change-store system-uris)}]]]]] + {:post (routes.rev/post-revision-corrections-route-config system)}]]]]] {;;:reitit.middleware/transform dev/print-request-diffs ;; pretty diffs ;;:validate spec/validate ;; enable spec validation for route data @@ -416,7 +416,7 @@ specifications for each route.") wrap-request-base-uri])))) (defmethod ig/pre-init-spec :tpximpact.datahost.ldapi.router/handler [_] - (s/keys :req-un [::clock ::triplestore ::change-store ::system-uris ::base-path] + (s/keys :req-un [::clock ::triplestore ::change-store ::system-uris ::base-path ::store-factory] :opt-un [::auth])) (defmethod ig/init-key :tpximpact.datahost.ldapi.router/handler [_ opts] diff --git a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/routes/middleware.clj b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/routes/middleware.clj index f581224d..02a008b4 100644 --- a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/routes/middleware.clj +++ b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/routes/middleware.clj @@ -28,6 +28,7 @@ :body {:message "Not acceptable. Only Content-Type: application/json is accepted"}})))) (defn entity-uris-from-path + "Puts the entity uris under [:datahost.request/uris ENTITY-KW]" [system-uris entities handler _id] {:pre [(m/validate [:set s.common/EntityType] entities)]} (fn entity-uris [request] @@ -54,6 +55,29 @@ (handler (assoc-in request [:datahost.request/entities entity-kw] entity)) (errors/not-found-response request))))) +(defn release-schema + "If found, puts the schema under [:datahost.request/entities :release-schema], + short-circuits with 422 response otherwise." + [triplestore system-uris handler _id] + (fn inner [{:keys [path-params] {release-uri :dh/Release} :datahost.request/uris :as request}] + (let [{:keys [series-slug release-slug]} path-params] + (if-let [schema (db/get-release-schema triplestore release-uri)] + (handler (assoc-in request [:datahost.request/entities :release-schema] schema)) + {:status 422 + :body {:message "No schema found for the release" + :release-uri (str release-uri)}})))) + +(defn release-schema-or-not-found + "If found, puts the schema under [:datahost.request/entities :release-schema], + short-circuits with 422 response otherwise." + [triplestore system-uris handler id] + (let [f (release-schema triplestore system-uris handler id)] + (fn inner [req] + (let [resp (f req)] + (if (= 422 (:status resp)) + (errors/not-found-response req) + resp))))) + (defn resource-exist? "Checks whether resource exists and short-circuits with 404 response if not. diff --git a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/routes/release.clj b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/routes/release.clj index 6538e97c..794df1c1 100644 --- a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/routes/release.clj +++ b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/routes/release.clj @@ -133,6 +133,8 @@ set `dcterms:modified` times for you." NOTE: Datahost tableschemas are extended subsets of CSVW:TableSchema." :handler (partial handlers/get-release-schema triplestore system-uris) + :middleware [[(partial middleware/entity-uris-from-path system-uris #{:dh/Release}) :release-uri] + [(partial middleware/release-schema-or-not-found triplestore system-uris) :release-schema-or-not-found]] :parameters {:path [:map routes-shared/series-slug-param-spec routes-shared/release-slug-param-spec]} @@ -142,12 +144,14 @@ NOTE: Datahost tableschemas are extended subsets of CSVW:TableSchema." :tags ["Consumer API"]}) (defn post-release-ld-schema-config - [clock triplestore system-uris] + [{:keys [clock triplestore system-uris] :as system}] {:summary "Create schema for a release" :description "Associates a Datahost TableSchema with the specified release. The supplied document should conform to the Datahost TableSchema." - :handler (partial handlers/post-release-schema clock triplestore system-uris) + :handler (partial handlers/post-release-schema system) + :middleware [[(partial middleware/entity-uris-from-path system-uris #{:dh/Release}) :entity-uris] + [(partial middleware/resource-exist? triplestore system-uris :dh/Release) :release-exists?]] ;; NOTE: file schema JSON content string is validated within the handler itself :parameters {:body routes-shared/LdSchemaInput :path [:map diff --git a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/routes/revision.clj b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/routes/revision.clj index 609ca776..fe8e35c3 100644 --- a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/routes/revision.clj +++ b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/routes/revision.clj @@ -8,7 +8,7 @@ [tpximpact.datahost.ldapi.routes.middleware :as middleware] [tpximpact.datahost.ldapi.routes.shared :as routes-shared])) -(defn get-revision-route-config [triplestore change-store system-uris] +(defn get-revision-route-config [{:keys [triplestore change-store system-uris] :as system}] {:summary "Retrieve metadata or CSV contents for an existing revision" :description "A revision represents a named set of updates to a dataset release. @@ -22,7 +22,7 @@ Release](#/Consumer%20API/get_data__series_slug__releases__release_slug_) for a summary of the versioning model. " :coercion (rcm/create {:transformers {}, :validate false}) - :handler (partial handlers/get-revision triplestore change-store system-uris) + :handler (partial handlers/get-revision system) :middleware [[(partial middleware/csvm-request-response triplestore system-uris) :csvm-response] [(partial middleware/entity-or-not-found triplestore system-uris :dh/Revision) :entity-or-not-found] @@ -92,10 +92,11 @@ that make up that revision. [:message string?]]}} :tags ["Publisher API"]}) -(defn commit-route-base [triplestore change-store system-uris change-kind] - {:handler (partial handlers/post-change triplestore change-store system-uris change-kind) +(defn commit-route-base [{:keys [triplestore change-store system-uris] :as sys} change-kind] + {:handler (partial handlers/post-change sys change-kind) :middleware [[(partial middleware/entity-uris-from-path system-uris #{:dh/Release :dh/Revision}) :entity-uris] [(partial middleware/resource-exist? triplestore system-uris :dh/Revision) :resource-exists?] + [(partial middleware/release-schema triplestore system-uris) :release-schema] ;; don't allow changes to revision N when revision N+1 already exists [(partial middleware/resource-already-created? triplestore system-uris @@ -124,20 +125,20 @@ that make up that revision. [:status [:enum "error"]] [:message string?]]}}}) -(defn post-revision-appends-route-config [triplestore change-store system-uris] - (merge (commit-route-base triplestore change-store system-uris :dh/ChangeKindAppend) +(defn post-revision-appends-route-config [sys] + (merge (commit-route-base sys :dh/ChangeKindAppend) {:summary "Add appends changes to a Revision via a CSV file." :description "Upload a delta of rows to append to a dataset as a CSV file." :tags ["Publisher API"]})) -(defn post-revision-retractions-route-config [triplestore change-store system-uris] - (merge (commit-route-base triplestore change-store system-uris :dh/ChangeKindRetract) +(defn post-revision-retractions-route-config [sys] + (merge (commit-route-base sys :dh/ChangeKindRetract) {:summary "Add retractions changes to a Revision via a CSV file." :description "Upload a delta of rows to retract from a dataset as a CSV file." :tags ["Publisher API"]})) -(defn post-revision-corrections-route-config [triplestore change-store system-uris] - (merge (commit-route-base triplestore change-store system-uris :dh/ChangeKindCorrect) +(defn post-revision-corrections-route-config [sys] + (merge (commit-route-base sys :dh/ChangeKindCorrect) {:summary "Add corrections to a Revision via a CSV file." :description "Upload a file of rows containing corrected 'measures' as a CSV file. Datahost will attempt to detect which diff --git a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/routes/series.clj b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/routes/series.clj index a95aae8d..d85b3d61 100644 --- a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/routes/series.clj +++ b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/routes/series.clj @@ -77,12 +77,15 @@ _Note: Setting this parameter will override the value of `dcterms:description` i [:message string?]]}} :tags ["Publisher API"]}) -(defn delete-series-route-config [triplestore change-store system-uris] +(defn delete-series-route-config + [{:keys [triplestore change-store system-uris] :as system}] {:summary "Delete a series and all its child resources" :description "Deletes the given dataset-series and all of its child resources, i.e. releases, schemas, revisions, and commits. **WARNING: This route is highly destructive and should not be used as part of a standard workflow, as it will break commitments to dataset consumers.**" - :handler (partial handlers/delete-dataset-series triplestore change-store system-uris) + :handler (partial handlers/delete-dataset-series system) + :middleware [[(partial middleware/entity-uris-from-path system-uris #{:dh/DatasetSeries}) :entity-uris] + [(partial middleware/entity-or-not-found triplestore system-uris :dh/DatasetSeries) :entities]] :parameters {:path [:map routes-shared/series-slug-param-spec]} :responses {204 {:description "Series existed and was successfully deleted"} 404 {:description "Series does not exist"}} diff --git a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/store.clj b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/store.clj index 8a01e1b6..5c308b25 100644 --- a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/store.clj +++ b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/store.clj @@ -3,13 +3,6 @@ (defprotocol ChangeStore "Represents a repository for document changes" - (-insert-data [this data] - "Inserts a ring file upload into this store. Returns a key value which - can be used to retrieve the append data from this store using `-get-data`. - - The file parameter should be a map with at least the following keys: - :tempfile - An IOFactory instance containing the append data - :filename - The name of the uploaded file on the request") (-insert-data-with-request [this request] "Because call to `-data-key` may be relatively expensive and @@ -29,10 +22,6 @@ (-delete [this data-key] "Deletes the data associated with data-key from this store")) -(defn insert-data - [store data] - (-insert-data store data)) - (deftype InsertRequest [data key] clojure.lang.ILookup (valAt [this k] diff --git a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/store/file.clj b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/store/file.clj index 45fd3279..019a0d48 100644 --- a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/store/file.clj +++ b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/store/file.clj @@ -25,7 +25,7 @@ (let [is ^java.io.InputStream input] (.reset is)))) -(defn- file->digest +(defn file->digest "Computes a file digest as a string for input with the named digest algorithm. @@ -68,9 +68,6 @@ (log/debug "-insert-data-with-request" {:type (type data) :key digest}))) digest)) - (-insert-data [this {:keys [tempfile]}] - (store/-insert-data-with-request this (store/make-insert-request! this tempfile))) - (-get-data [_this data-key] (let [location (file-location root-dir (.toString data-key))] (if (.exists location) diff --git a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/store/sql.clj b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/store/sql.clj new file mode 100644 index 00000000..4b3aa56f --- /dev/null +++ b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/store/sql.clj @@ -0,0 +1,470 @@ +(ns tpximpact.datahost.ldapi.store.sql + (:require + [next.jdbc :as jdbc] + [clojure.tools.logging :as log] + [clojure.java.io :as io] + [clojure.data.csv :as csv] + [integrant.core :as ig] + [malli.core :as m] + [malli.error :as m.error] + [metrics.timers :refer [time!]] + [tpximpact.datahost.ldapi.metrics :as metrics] + [tpximpact.datahost.ldapi.store :as store] + [tpximpact.datahost.ldapi.store.file :as store.file] + [tpximpact.datahost.ldapi.models.release :as m.release] + [tpximpact.datahost.ldapi.schemas.common :as s.common] + [tpximpact.datahost.ldapi.store.sql.interface :as sql.interface :refer [submit]] + [tpximpact.datahost.ldapi.util.name-munging :as util.name-munging] + [tpximpact.datahost.ldapi.util.data.validation :as data.validation] + [tpximpact.datahost.ldapi.util.data.compilation :as data.compilation]) + (:import (java.net URI) + (java.util.concurrent Executors ExecutorService ThreadFactory) + (java.util.concurrent.atomic AtomicLong))) + +(defrecord ObservationInsertRequest [release-uri key data op]) + +(defn make-observation-insert-request! + "Analogous to `store/make-insert-request!`" + [release-uri data change-kind] + (let [op (case change-kind + :dh/ChangeKindAppend (int 1) + :dh/ChangeKindRetract (int 2) + :dh/ChangeKindCorrect (int 3))] + (when-not (instance? URI release-uri) + (throw (ex-info (str "Given URI does not resolve to a valid release URI " release-uri) {:uri release-uri}))) + (->ObservationInsertRequest release-uri (store.file/file->digest data "SHA-256") data op))) + +(defn relevant-indices + [required-col-names found-col-names] + (let [found->indices (into {} (map-indexed (fn [i col-name] [col-name i]) found-col-names)) + _ (when-not (= (count found-col-names) (count found->indices)) + (throw (ex-info "Invalid arguments. The counts of required and found columns do not match." + {:required required-col-names :found found-col-names}))) + indices-to-extract (map #(get found->indices %) required-col-names)] + (assert (= (count required-col-names) (count indices-to-extract))) + indices-to-extract)) + +(defn- extract-header + "Returns a vector of seq of column names or throws." + [s] + (let [[[header] _seq] (split-at 1 s) + header (first (csv/read-csv header))] + (when-not (seq header) + (throw (ex-info "No columns found in header" {:type :tpximpact.datahost.ldapi.errors/input-data-error}))) + header)) + +(defn- csv-line-coercers + "Returns a vector of one arg. coercer fns in the same ordre as columns + specified in row-schema. Throws when value can't be reasonably + coerced." + ;; TODO: find a better place to put the coercer fn. Maybe in row-schema's metadata? + [row-schema] + (let [coerce (fn type-coerce [{:dataset.column/keys [datatype name]}] + (case datatype + :string (fn v->string [v] + (cond + (string? v) v)) + :int (fn v->int [v] + (cond (int? v) v + (string? v) (Long/parseLong v))) + :double (fn v->double [v] + (cond + (double? v) v + (string? v) (Double/parseDouble v)))))] + (into [] (comp (map m/properties) + (map coerce)) + (m/children row-schema)))) + +(defn- safely-read-csv-line [line] (first (csv/read-csv line))) + +(defn- extract-column + "Extracts column at idx, turns empty string values into nulls." + [coll idx] + (let [value (nth coll idx)] + (when-not (and (string? value) (.isBlank ^String value)) + value))) + +(defn- validate-request [{:keys [release-uri] :as _store} {req-release-uri :release-uri :as request}] + ;; TODO: make a schema + (assert (:op request)) + (assert (:key request)) + (when-not (= req-release-uri release-uri) + (throw (ex-info (str "Request release URI does not match the observations store release URI of:" release-uri) + {:request-uri req-release-uri :release-uri release-uri})))) + +(defn- insert-import-record + "Returns {:next.jdbc/update-count int}. Update count of zero means the + data for the given digest was already imported + + Parameters: + + - tx - jdbc transaction + - insert request + - commit-uri + - header - seq of column names" + [tx {:keys [release-uri key] :as _store} commit-uri header] + (let [params {:import-uid key :release-uri release-uri} + insert-import-result (try + (m.release/-insert-import tx (assoc params :import-id (m.release/import-id params))) + (catch Exception ex + (throw (ex-info (str "Insert import failure. import_uid=" key ": " (ex-message ex)) + {:import-uid key :header header :commit-uri commit-uri} + ex)))) + munged-release-id (util.name-munging/sanitize-name release-uri)] + (log/info (format "-insert-data-with-request: commit-uri='%s', import_uid='%s', table=import_%s" + commit-uri key munged-release-id) + insert-import-result) + insert-import-result)) + +(defn- make-valid-observation-row + "Returns a vector to be used as an 'insert' value>, columns are ordered + [import-id, coords-hash, synth-hash ...COLS-IN-SCHEMA-ORDER] + + row-prefix is a tuple of [import-id ?coords ?synt-id], wheret the ?coords and ?synth id + are filled out by this function so the passed in values are not used." + [row-prefix line seq row-schema* validator {:keys [coords-columns hasher]}] + (let [validator! (fn validator* [full-row] + (if (validator full-row) + full-row + (throw (ex-info "Row does not conform to schema." + {:type :tpximpact.datahost.ldapi.errors/validation-failure + :import-id (nth row-prefix 0) + :extracted-row (drop (count row-prefix) full-row) + :malli/explanation (m.error/humanize (m/explain row-schema* full-row))})))) + row (into row-prefix + (map (fn extract+coerce [i+c] + (let [index (nth i+c 0) coercer (nth i+c 1)] + (coercer (extract-column (safely-read-csv-line line) index))))) + seq) + offset (count row-prefix) + coords (apply hasher (map #(nth row (+ % offset)) coords-columns)) + synth-id (apply hasher (drop offset row)) + row (-> (transient row) (assoc! 1 coords 2 synth-id) persistent!)] + (validator! row))) + +(defn- row-producer + [^java.io.Reader rdr + ^java.util.concurrent.ArrayBlockingQueue q + partition-size + ^AtomicLong row-counter + {:keys [required-columns row-schema] :as _ostore} + {import-id :import-id header :header munged-release-id :release-id* :as _derived} + {:keys [op] :as request}] + (try + (when (not row-schema) + (throw (ex-info "Missing row-schema" {:import-id import-id}))) + (let [indices (relevant-indices required-columns header) + coercers (csv-line-coercers row-schema) + _ (assert (= (count indices) (count coercers))) + ;; we extend the 'raw' release schema with SQL table's bookkeeping columns + schema* (into [:tuple :int :int :int] (m/children row-schema)) + validator (m/validator schema*) + seq (map (fn [i c] [i c]) indices coercers) + {:keys [hasher]} (data.compilation/make-change-context op row-schema) + col-indices (data.validation/row-schema-indices row-schema)] + (doseq [lines (partition-all partition-size (line-seq rdr))] + (let [rows ^java.util.List (java.util.ArrayList. (count lines)) + row-prefix [import-id nil nil]] ;; we fill out the nils in `make-valid-observation-row` + (doseq [line lines] + (.add rows (make-valid-observation-row row-prefix + line + seq + schema* + validator + (assoc col-indices :hasher hasher)))) + (.addAndGet row-counter (count rows)) + (.put q rows))) + (log/debug (format "producer thread done (table=%s)" munged-release-id)) + (.put q ::done)) + (catch Exception ex + (let [msg "ObservationStore: terminating producer thread due to error"] + (log/warn ex msg) + (.put q (ex-info msg {:import-id import-id :insert-request request} ex)))) + (catch AssertionError err + (log/error "row-producer: assertion error:" (ex-message err)) + (.put q err)))) + + +(defn- insert-data-with-request + "Note: this function performs multiple database calls. It's on the caller to + ensure the passed in db object is a transaction." + [{:keys [data-source executor ^URI release-uri row-schema required-columns munged-col-names] :as ostore} + {digest :key data :data ^URI commit-uri :commit-uri op :op :as request}] + (assert executor) + (assert commit-uri) + (assert row-schema) + (validate-request ostore request) + (time! + metrics/insert-observations + (let [partition-size 2000 ;TODO(rosado): make it configurable, probably in the insert-request map + q (java.util.concurrent.ArrayBlockingQueue. partition-size)] + (with-open [rdr (io/reader data)] + (let [header (extract-header (line-seq rdr)) + batch-counter ^AtomicLong (AtomicLong. 0) + row-counter ^AtomicLong (AtomicLong. 0) + munged-release-id (util.name-munging/sanitize-name release-uri) + t0 (System/currentTimeMillis) + tx (if (instance? javax.sql.DataSource data-source) ;NOTE(rosado): not happy with this conditional + (jdbc/get-connection data-source) + data-source) + + {:next.jdbc/keys [update-count]} (.get (submit executor #(insert-import-record tx (assoc request :release-uri release-uri) commit-uri header)))] + (if (zero? update-count) + (log/info "insert-data-with-request: Data alread imported for data-key. Skipping" + {:data-key digest + :commit-uri commit-uri + :status :import.status/already-imported}) + (do + (let [{import-id :id} (m.release/select-import tx {:import-uid digest} {:store ostore})] + (future ;TODO: setup a proper thread pool for IO + (row-producer rdr q partition-size row-counter + ostore + {:release-id* munged-release-id :header header :import-id import-id} + request)) + + (loop [batch (.take q)] + (when (instance? Throwable batch) + (throw (ex-info "Failed to import observations" + {:type :tpximpact.datahost.ldapi.errors/input-data-error + :release-uri release-uri} + batch))) + (when-not (= batch ::done) + (.get (submit executor + #(do + (m.release/import-observations tx {:release_id munged-release-id + :import_id (int import-id) + :column-names munged-col-names + :observations (seq batch)}) + (let [num-batch (.incrementAndGet batch-counter)] + (log/debug (format "writing observations: table=import_%s batch=%s" + munged-release-id num-batch)))))) + + (recur (.take q))))) + + (let [seconds (/ (- (System/currentTimeMillis) t0) 1000.0) + num-rows (.longValue row-counter) + rows-per-sec (when (< 0.0001 seconds) + (/ num-rows (float seconds)))] + (log/info (format (str "-insert-data-with-request: done writing release-path='%s', " + "num-batches=%s, batch-size=%s total-rows=%s took %.3fs, %s") + (.getPath release-uri) (.longValue batch-counter) partition-size num-rows + seconds + (format "%.2f rows/sec" (or rows-per-sec (float num-rows)))))))) + (cond-> {:data-key digest :commit-uri commit-uri :status :import.status/success} + (zero? update-count) (assoc :status :import.status/already-imported))))))) + +(defn execute-insert-request* + [{:keys [executor data-source release-uri] :as store} {import-uid :key :as request}] + (validate-request store request) + (try + (insert-data-with-request store request) + (catch Exception ex + (log/warn (or (ex-cause ex) ex) + "execute-insert-request*: Data insert failed, marking import as 'failed', import-uid: " import-uid) + (.get (submit executor #(m.release/update-import-status + (jdbc/get-connection data-source) + {:import-id (m.release/import-id {:release-uri release-uri + :import-uid import-uid}) + :status "failed"}))) + (throw ex)))) + +(defprotocol ObservationRepo + "Operations for interacting with the repository keeping the + observation data. + + See also: + - [[create-commit?]]" + (-execute-insert-request [this request] "Returns a {:status STATUS-KW}")) + +(defn execute-insert-request + "Returns a map of {:status STATUS-KW ...}" + [store request] + (-execute-insert-request store request)) + +(defrecord ObservationsStore [data-source executor release-uri row-schema required-columns munged-col-names] + ObservationRepo + (-execute-insert-request [this request] + (execute-insert-request* this request)) + + sql.interface/SQLStoreCompatible + (-make-select-observations-as [_ prefix] + (assert (= (count required-columns) (count munged-col-names))) + (let [make-mangled (if (nil? prefix) + identity + #(str prefix "." % ))] + (map (fn [mangled demangled] [(make-mangled mangled) demangled]) + munged-col-names required-columns))) + + ;; TODO: remove impl of `store/ChangeStore` protocol. + store/ChangeStore + (-get-data [this data-key] + (m.release/select-observations (jdbc/get-connection data-source) + {:release-uri release-uri :import-uid data-key} + {:store this})) + + (-data-key [_ data] + (store.file/file->digest data "SHA-256"))) + +(defn complete-import + "Completes the import related bookeeping, with the expectation that import-request finished + successfully. Returns a map of {:status STATUS} or throws. + + - :import.status/already-imported means the import was completed at an earlier time (e.g. file with the same + contents was processed already)." + [{db :data-source db-executor :executor :as store} + {^URI commit-uri :commit-uri digest :key op :op :as request}] + (let [base-params {:import-uid digest :release-uri (:release-uri store)} + update-import-status + (-> db-executor + (submit + #(do + (try + (let [result (m.release/complete-import--copy-records + db + (merge base-params {:insert-columns (:munged-col-names store) :op op}) + {:store store})] + (log/debugf "complete-import--copy-records commit-path=%s affected: %s rows" + (.getPath commit-uri) + result)) + (m.release/complete-import--delete-import-records db base-params {:store store}) + (m.release/update-import-status db {:status "completed" + :import-id (m.release/import-id base-params)}) + (catch Exception ex + ;; log and terminate + (log/warn ex "Error"))))) + (.get)) + {update-status-num :next.jdbc/update-count} update-import-status] + (log/debug (str "commit-path=" (.getPath commit-uri) " import status update: " update-status-num)) + (when (zero? update-status-num) + (throw (ex-info "Could not complete import. Status update failed" + {:update-status-count update-status-num + :import-uid digest + :request request}))) + {:status :import.status/completed})) + + +(def ^:private import-status-h + (-> (make-hierarchy) + (derive :import.status/already-imported :datahost.sql.commit/create) + (derive :import.status/success :datahost.sql.commit/create))) + +(defn create-commit? + "Should we create a 'commit' based on the import status?" + [import-status] + (isa? import-status-h import-status :datahost.sql.commit/create)) + +(defn create-commit + [{db :data-source db-executor :executor :as _store} + {^URI commit-uri :commit-uri ^URI release-uri :release-uri import-uid :key op :op + :as request}] + (let [[_ rev com] (re-find #"^.*\/revision\/(\d+)\/commit\/(\d+)$" (str commit-uri)) + rev-id (Long/parseLong rev) + com-id (Long/parseLong com) + update-count (-> db-executor + (submit #(m.release/insert-commit db {:release-uri release-uri + :uid commit-uri + :revision-id rev-id + :change-id com-id + :op op + :import_uid import-uid})) + (.get) + :next.jdbc/update-count)] + (when-not (= 1 update-count) + (log/debugf "create-commit/insert-failed: commit-path=%s" (.getPath commit-uri)) + (throw (ex-info (format "Could not create commit %s" commit-uri) {:insert-request request}))))) + +(defn make-observation-store + [db db-executor release-uri row-schema] + (when-not (m/validate :datahost/uri release-uri {:registry s.common/registry}) + (throw (ex-info "Ivalid release-uri" {:uri release-uri}))) + (let [{:keys [col-names original-column-names]} (util.name-munging/observation-column-defs-sql {:row-schema row-schema})] + (->ObservationsStore db + db-executor + release-uri + row-schema + original-column-names + col-names))) + +;;; TODO: writes in `replay-commits` should happen on the executor thread + +(defn replay-commits + [tx {:keys [store ^URI commit-uri snapshot-table]}] + (let [opts {:store store} + commit-ids (reverse (m.release/get-commit-ids tx {:commit-uri commit-uri :release-uri (:release-uri store)})) + _ (m.release/create-temporary-snapshot-table tx {:table-name snapshot-table} opts) + result (m.release/select-commit-observations-into tx {:snapshot-table snapshot-table + :commit-uri commit-uri + :commit-id (:id (first commit-ids))} + opts) + first-ids (map :id (take 3 commit-ids))] + (log/info (format "replay-commits/commit-path=%s found %s commits, first %s ids=" + (.getPath commit-uri) (count commit-ids) (count first-ids)) + first-ids) + (log/debug {:replay-commits/select-commit-obs-into result}) + (doseq [{commit-uid :uid op :op commit-id :id} (next commit-ids) + :let [params {:snapshot-table snapshot-table + :commit-id commit-id + :commit-uri ^java.net.URI (java.net.URI. commit-uid)}]] + (cond + (= op 1) + (let [append-result (m.release/commit-op-append tx params opts)] + (log/debug (format "replay-commits/commit(append): %s" (.getPath commit-uri)) append-result)) + + (= op 2) + (let [retraction-result (m.release/commit-op-retract tx params opts)] + (log/debug (format "replay-commits/commit(retraction): %s" (.getPath commit-uri)) retraction-result)) + + (= op 3) + (let [temp-table (str "scratch_" (java.util.UUID/randomUUID))] + (m.release/-create-corrections-scratch-table tx {:table-name temp-table} opts) + (m.release/populate-corrections-scratch-table tx (assoc params :table-name temp-table) opts) + (m.release/-commit-op-correct--delete-stale-records tx (assoc params :table-name temp-table)) + (let [result (m.release/commit-op-correct--insert-updated-records tx (assoc params :table-name temp-table) opts)] + (log/debug (format "replay-commits/commit(correction): %s, insert updated records result: " + (.getPath commit-uri)) + result + {:temp-table temp-table}))) + :else (throw (ex-info (format "Unknown 'op' value='%s'" op) {:commit-uri commit-uri :op op})))))) + +(defn sqlite-table-exist? + [conn table-name] + (= 1 (-> conn + (jdbc/execute-one! ["select exists (select name from sqlite_master where type = ? and name = ?) as count" + "table" table-name]) + :count))) + +;; (defn pg-table-exists? +;; [conn table-name] +;; (= 1 (-> conn +;; (jdbc/execute-one! ["select exists ( select 1 from information_schema.tables WHERE table_name = ? ) AS count" +;; table-name])))) + +(defmethod ig/init-key ::store-factory [_ {:keys [data-source db-executor]}] + (log/info "making observation store factory: " data-source) + (partial make-observation-store data-source db-executor)) + +(defmethod ig/halt-key! ::store-factory [_ _]) + +(defmethod ig/init-key ::executor [_ {_config :config}] + (let [thread-factory (reify ThreadFactory + (newThread [_this runnable] + (let [t (Thread. runnable)] + (doto t + (.setName (str "sqlexec-" (.getId t))))))) + exec (Executors/newSingleThreadExecutor thread-factory)] + exec)) + +(defmethod ig/halt-key! ::executor [_ ^ExecutorService executor] + (.shutdown executor)) + +(defmethod ig/init-key ::data-source [_ {:keys [db-config]}] + ;; NOTE(rosado): using 'jdbc/get-datasource' here resulted no data + ;; being written to the database. Possibly a configuration issue, + ;; but it ate too much time, so doing it the easy way for now + (let [{:keys [spec user password]} db-config] + (reify javax.sql.DataSource + (getConnection [_this] + (jdbc/get-connection spec user password))))) + +(defmethod ig/halt-key! ::data-source [_ _data-source]) + diff --git a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/store/sql/interface.clj b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/store/sql/interface.clj new file mode 100644 index 00000000..b900543a --- /dev/null +++ b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/store/sql/interface.clj @@ -0,0 +1,19 @@ +(ns tpximpact.datahost.ldapi.store.sql.interface + (:import (java.util.concurrent ExecutorService Future))) + +(defprotocol SQLStoreCompatible + (-make-select-observations-as [this prefix] + "Returns a seq of tuple. 'as-name' is going + to be used as an alias in a 'select' statement as in: + + ```sql + select foo as \"My Column\" from observations; + ```")) + +(defn submit + "Submit f to the executor service. + + Motivation: for some databases (eq. SQLite) we want to ensure + there's only one writer. Returns a Future." + (^Future [^ExecutorService executor ^Callable f] + (.submit executor ^Callable f))) diff --git a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/util/data/compilation.clj b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/util/data/compilation.clj index 05627b9d..61a73e12 100644 --- a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/util/data/compilation.clj +++ b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/util/data/compilation.clj @@ -22,7 +22,7 @@ ;;; use content hash (a store key) to get the data (defmethod -as-dataset String [v {:keys [store] :as opts}] - (assert (= 64 (count v))) + (assert (or (= 64 (count v) (and (string? v) (.startsWith v "http")))) (format "got: '%s'" v)) (assert store) (with-open [is (store/get-data store v)] (as-dataset is opts))) @@ -102,6 +102,8 @@ (def ^:private compile-dataset-opts-valid? (m/validator CompileDatasetOptions)) +(def make-change-context internal/make-change-context) + (defn- compile-reducer "Reducer function for creating a dataset out of a seq of [[ChangeInfo]]s" [{row-schema :row-schema :as opts} ds change] diff --git a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/util/data/delta.clj b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/util/data/delta.clj index f47aa9f8..d4cfc0b4 100644 --- a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/util/data/delta.clj +++ b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/util/data/delta.clj @@ -25,7 +25,6 @@ (assoc row "datahost.row/id" id data.internal/coords-column-name coords))) (defn- make-column-names - "Returns a map of {:coords {:left ... :right ...} :measure {:left ... right ...} :id RIGHT-ID-COL-NAME}" [right-ds-name measure-column-name coord-column-names] (let [right-measure-column-name (data.internal/r-joinable-name right-ds-name measure-column-name) right-coord-column-names (map #(data.internal/r-joinable-name right-ds-name %) diff --git a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/util/data/internal.clj b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/util/data/internal.clj index abd68ff3..ecf6621f 100644 --- a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/util/data/internal.clj +++ b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/util/data/internal.clj @@ -131,4 +131,5 @@ {:row-schema row-schema :datahost.change/kind change-kind :coords-columns (row-schema--extract-component-column-names row-schema) - :measure-column (row-schema--extract-measure-column-name row-schema)}) + :measure-column (row-schema--extract-measure-column-name row-schema) + :hasher (make-columnwise-hasher)}) diff --git a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/util/data/validation.clj b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/util/data/validation.clj index cbabbfdc..fffaee07 100644 --- a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/util/data/validation.clj +++ b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/util/data/validation.clj @@ -122,10 +122,9 @@ (let [validator (m/validator DatasetRow)] (fn row-schema-validator [schema] (or (validator schema) - (throw (java.lang.AssertionError. - "Invalid DatasetRow schema" - (ex-info "Invalid DatasetRow schema" - {:malli/explanation (me/humanize (m/explain DatasetRow schema))}))))))) + (throw (ex-info "Invalid DatasetRow schema" + {:malli/explanation (me/humanize (m/explain DatasetRow schema)) + :schema schema})))))) (defn- set-col-schema-props @@ -194,13 +193,19 @@ ([json-ld-schema column-names] (make-row-schema json-ld-schema column-names {})) ([json-ld-schema column-names options] - (make-row-schema* (schema-columns json-ld-schema) - column-names - {:name-key (:name column-uri-keys) - :type-key (:type column-uri-keys) - :column-datatype-extractor ld-column-datatype-extractor - :uri-lookup first - :unpack? true} ))) + (let [col-number-key (URI. "http://www.w3.org/ns/csvw#number") + key-fn #(first (get % col-number-key)) + ordered-cols (sort-by key-fn (schema-columns json-ld-schema)) + col-name-key (URI. "http://www.w3.org/ns/csvw#name") + get-col-name #(first (get % col-name-key)) + col-names (map get-col-name ordered-cols)] + (make-row-schema* ordered-cols + col-names + {:name-key (:name column-uri-keys) + :type-key (:type column-uri-keys) + :column-datatype-extractor ld-column-datatype-extractor + :uri-lookup first + :unpack? true} )))) (defn make-row-schema-from-json "Make a malli schema for a tuple of row values specified by @@ -214,12 +219,23 @@ (when-not (validate-ld-release-schema-input-valid? json) (throw (ex-info "Invalid input release schema" {:release-schema json}))) (make-row-schema* (get json "dh:columns") - column-names + column-names ;TODO(rosado): ensure col names are passed {:name-key "csvw:name" :type-key "@type" :column-datatype-extractor json-column-datatype-extractor :uri-lookup uris/json-column-types}))) +(defn row-schema-indices + "Returns {:measure-column int :coords-columns seq}" + [row-schema] + (let [{:keys [measure]} uris/column-types] + (reduce (fn [m [index props]] + (if (= measure (:dataset.column/type props)) + (assoc m :measure-column index) + (update m :coords-columns conj index))) + {:coords-columns []} + (map-indexed (fn [i child] [i (m/properties child)]) (m/children row-schema))))) + (defn parsing-errors [column] (assert (instance? tech.v3.dataset.impl.column.Column column)) (-> column meta :unparsed-data)) diff --git a/datahost-ld-openapi/src/tpximpact/datahost/ldapi/util/name_munging.clj b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/util/name_munging.clj new file mode 100644 index 00000000..faec4b51 --- /dev/null +++ b/datahost-ld-openapi/src/tpximpact/datahost/ldapi/util/name_munging.clj @@ -0,0 +1,75 @@ +(ns tpximpact.datahost.ldapi.util.name-munging + "Utilites for turning arbitrary strings into identifiers valid in SQL" + (:require + [clojure.string :as string] + [malli.core :as m] + [tpximpact.datahost.uris :as uris])) + +(defn uri->series+release [^java.net.URI uri] + (let [[_ series-name release-name] (re-find #"^\/\w+\/(.*)\/release\/([\w-_\.^\/]+)[\/]{0,1}.*" (.getPath uri))] + (when release-name + [series-name release-name]))) + +(defprotocol NameSanitizer + (-sanitize-name [this] "Returns a string")) + +(defprotocol NameAdapter + (-sanitize [this v] "Returns a string") + (-observation-column-name [this v] "Returns a string with a prefix appropiate for the column type")) + +(defrecord PrefixSQLNameAdapter [] + NameSanitizer + (-sanitize [this s] + s) + + NameAdapter + (-observation-column-name [this props] + (str (condp = (:dataset.column/type props) + (:measure uris/column-types) "m::" + (:dimension uris/column-types) "dim::" + (:attribute uris/column-types) "attr::") + (-sanitize this (:dataset.column/name props))))) + +(extend-protocol NameSanitizer ;TODO: improve naming. not specific enough + java.net.URI + (-sanitize-name [this] + (let [[series-name release-name] (uri->series+release this)] + (when-not (and series-name release-name) + (throw (ex-info "URI should identify a release." {:this this}))) + (string/replace (str series-name "/" release-name) #"\." "_")))) + +(defn sanitize-name + [v] + (-sanitize-name v)) + +(defn make-name-adapter + [] + (->PrefixSQLNameAdapter)) + +(defn demunge-name + "Returns the demunged name or nil." + [s] + (when-let [m (re-find #"^(m::|attr::|dim::)(.*)$" s)] + (nth m 2))) + +(defn observation-column-defs-sql + "Returns a map of {:original-column-names vector, :col-names vector, :col-spec seq<[name munged-name sql-string]>}" + [{:keys [row-schema]}] + (let [adapter (make-name-adapter) + seq + (sequence (comp (map m/properties) + (map (fn to-tuples [props] + (let [munged-name (-observation-column-name adapter props) + dt (:dataset.column/datatype props)] + [(:dataset.column/name props) + munged-name + ;; TODO: find better place for the below, probabaly a builder fn + ;; should passed in + (case dt + :string "varchar(128)" + :int "integer" + :double "double")])))) + (m/children row-schema))] + {:original-column-names (mapv #(nth % 0) seq) + :col-names (mapv #(nth % 1) seq) + :col-spec seq})) diff --git a/datahost-ld-openapi/src/tpximpact/datahost/system_uris.clj b/datahost-ld-openapi/src/tpximpact/datahost/system_uris.clj index 0d696ee0..e66a3d07 100644 --- a/datahost-ld-openapi/src/tpximpact/datahost/system_uris.clj +++ b/datahost-ld-openapi/src/tpximpact/datahost/system_uris.clj @@ -113,3 +113,19 @@ match the typical path params." [resource system-uris params] (-resource-uri resource system-uris params)) + +(defn previous-commit-coords + "Given revision and commit id, tries to find the preceding commit's + revision and change ids. If not possible, indicates an extra DB + lookup is needed. + + Returns a [:new :new] | [rev-id change-id] | [rev-id :find]." + [revision-id change-id] + {:pre [(pos? revision-id) (pos? change-id)]} + (cond + (and (= revision-id 1) (= 1 change-id)) [:new :new] + + (< 1 change-id) [revision-id (dec change-id)] + + ;; we need to find the last change-id in previous revision + :else [(dec revision-id) :find])) diff --git a/datahost-ld-openapi/test/tpximpact/datahost/ldapi/client/ring.clj b/datahost-ld-openapi/test/tpximpact/datahost/ldapi/client/ring.clj index fb99ff1b..a1e9ceb9 100644 --- a/datahost-ld-openapi/test/tpximpact/datahost/ldapi/client/ring.clj +++ b/datahost-ld-openapi/test/tpximpact/datahost/ldapi/client/ring.clj @@ -52,7 +52,7 @@ {"dcterms:title" "title" "dcterms:description" "description"})) :headers {"content-type" "text/csv"} - :body data} + :body (clojure.java.io/input-stream (.getBytes data))} response (handler request)] (if (= 201 (:status response)) (resources/on-created change response) @@ -95,4 +95,4 @@ :headers {"accept" "application/json"}} {:keys [status body] :as _response} (handler request)] (when (= 200 status) - (json/read-str body)))) \ No newline at end of file + (json/read-str body)))) diff --git a/datahost-ld-openapi/test/tpximpact/datahost/ldapi/models/release_test.clj b/datahost-ld-openapi/test/tpximpact/datahost/ldapi/models/release_test.clj index 93cbce46..d631ce96 100644 --- a/datahost-ld-openapi/test/tpximpact/datahost/ldapi/models/release_test.clj +++ b/datahost-ld-openapi/test/tpximpact/datahost/ldapi/models/release_test.clj @@ -221,31 +221,3 @@ (select-keys body' ["dcterms:issued" "dcterms:modified"])) "The document shouldn't be modified"))))))))) -(deftest csvm-release-test - (th/with-system-and-clean-up {{:keys [GET PUT]} :tpximpact.datahost.ldapi.test/http-client - :as sys} - - (let [new-series-slug (str "new-series-" (UUID/randomUUID)) - new-series-path (str "/data/" new-series-slug) - release-1-id (str "release-" (UUID/randomUUID)) - release-1-path (str new-series-path "/release/" release-1-id) - release-1-csvm-path (str release-1-path "-metadata.json")] - - (testing "Fetching csvw metadata for a release that does not exist returns 'not found'" - (let [{:keys [status body]} (GET release-1-csvm-path)] - (is (= 404 status)) - (is (= "Not found" body)))) - - (PUT new-series-path - {:content-type :json - :body (json/write-str {"dcterms:title" "A title" - "dcterms:description" "Description"})}) - - (PUT release-1-path - {:content-type :json - :body (json/write-str {"dcterms:title" "Example Release" - "dcterms:description" "Description"})}) - - (testing "Fetching existing release with no revisions" - (let [response (GET (str release-1-path ".csv"))] - (is (= 422 (:status response)))))))) diff --git a/datahost-ld-openapi/test/tpximpact/datahost/ldapi/models/series_test.clj b/datahost-ld-openapi/test/tpximpact/datahost/ldapi/models/series_test.clj index 41d8a69a..d813d3cd 100644 --- a/datahost-ld-openapi/test/tpximpact/datahost/ldapi/models/series_test.clj +++ b/datahost-ld-openapi/test/tpximpact/datahost/ldapi/models/series_test.clj @@ -409,62 +409,3 @@ (t/is (= false (resource-exists-in-db? client schema))) (t/is (= false (resource-exists-in-db? client revision))) (t/is (= false (resource-exists-in-db? client change)))))) - -(t/deftest delete-series-with-shared-change-test - ;; test that creates two series with their own releases, revisions and a - ;; shared schema. Both contain a change with the same contents - these - ;; changes should have the same address within the store. The test deletes - ;; one of the series (and therefore its associated change) and checks that - ;; the contents of the change in the other series can still be fetched - (with-open [client (ring-client/create-client)] - (let [series1-info (resources/create-series "series1" {"dcterms:title" "Series 1" "dcterms:description" "Series 1"}) - series2-info (resources/create-series "series2" {"dcterms:title" "Series 2" "dcterms:description" "Series 2"}) - - release1-info (resources/create-release "release1" {"dcterms:title" "Release 1" "dcterms:description" "Release 1"}) - release2-info (resources/create-release "release2" {"dcterms:title" "Release 2" "dcterms:description" "Release 2"}) - - schema-columns [{"@type" "dh:DimensionColumn" - "csvw:datatype" "string" - "csvw:name" "col1" - "csvw:titles" "col1"} - {"@type" "dh:DimensionColumn" - "csvw:datatype" "string" - "csvw:name" "col2" - "csvw:titles" "col2"} - {"@type" "dh:MeasureColumn" - "csvw:datatype" "string" - "csvw:name" "col3" - "csvw:titles" "col3"}] - - schema1-info (resources/create-schema {"dcterms:title" "Schema 1" - "dh:columns" schema-columns}) - schema2-info (resources/create-schema {"dcterms:title" "Schema 2" - "dh:columns" schema-columns}) - - revision1-info (resources/create-revision {"dcterms:title" "Revision 1" "dcterms:description" "Revision 1"}) - revision2-info (resources/create-revision {"dcterms:title" "Revision 2" "dcterms:description" "Revision 2"}) - - change-data [["col1" "col2" "col3"] - ["a" "b" "c"] - ["1" "2" "3"]] - - change1-info (resources/create-change :dh/ChangeKindAppend change-data {"dcterms:description" "Change 1"}) - change2-info (resources/create-change :dh/ChangeKindAppend change-data {"dcterms:description" "Change 2"}) - - series1 (ring-client/create-resource client (time/system-now) series1-info) - release1 (create-child client series1 release1-info) - schema1 (create-child client release1 schema1-info) - revision1 (create-child client release1 revision1-info) - change1 (create-child client revision1 change1-info) - - series2 (ring-client/create-resource client (time/system-now) series2-info) - release2 (create-child client series2 release2-info) - schema2 (create-child client release2 schema2-info) - revision2 (create-child client release2 revision2-info) - change2 (create-child client revision2 change2-info)] - - (ring-client/delete-resource client (time/system-now) series1) - - ;; change for series2 should still exist - (let [fetched-change (ring-client/get-resource client change2)] - (t/is (some? fetched-change)))))) diff --git a/datahost-ld-openapi/test/tpximpact/datahost/ldapi/store/file_test.clj b/datahost-ld-openapi/test/tpximpact/datahost/ldapi/store/file_test.clj index 5bab8db3..27fea89d 100644 --- a/datahost-ld-openapi/test/tpximpact/datahost/ldapi/store/file_test.clj +++ b/datahost-ld-openapi/test/tpximpact/datahost/ldapi/store/file_test.clj @@ -35,7 +35,7 @@ (defn- add-to-store [store files m contents] (let [f (new-temp-file files "filestore-test")] (spit f contents) - (let [key (store/insert-data store {:tempfile f :filename (.getName f)})] + (let [key (store/request-data-insert store (store/make-insert-request! store f))] (assoc m key contents)))) (def file-contents-gen gen/string) diff --git a/datahost-ld-openapi/test/tpximpact/datahost/ldapi/store/sql_test.clj b/datahost-ld-openapi/test/tpximpact/datahost/ldapi/store/sql_test.clj new file mode 100644 index 00000000..f4e669a5 --- /dev/null +++ b/datahost-ld-openapi/test/tpximpact/datahost/ldapi/store/sql_test.clj @@ -0,0 +1,164 @@ +(ns ^:sql tpximpact.datahost.ldapi.store.sql-test + (:require + [clojure.test :refer [deftest is testing use-fixtures]] + [clojure.tools.logging :as log] + [clojure.string :as string] + [malli.core :as m] + [malli.error :as m.e] + [next.jdbc :as jdbc] + [next.jdbc.result-set :refer [as-unqualified-maps]] + [next.jdbc.result-set :as jdbc.rs] + [tpximpact.test-helpers :as th :refer [*system*]] + [tpximpact.datahost.ldapi.store :as store] + [tpximpact.datahost.ldapi.store.sql :as store.sql] + [tpximpact.datahost.ldapi.util.data.validation :as d.validation] + [tpximpact.datahost.ldapi.models.release :as m.release] + [tpximpact.datahost.ldapi.util.name-munging :as util.munge]) + (:import + (java.sql Clob + ResultSet ResultSetMetaData + Statement + SQLException))) + + +(use-fixtures :each th/with-system-fixture) + +(defn make-test-db + [data-source] + (jdbc/get-connection data-source)) + +(defn- humanize [v] + (-> (m/explain tpximpact.datahost.ldapi.routes.shared/LdSchemaInput v) m.e/humanize)) + +(def row-schema (d.validation/make-row-schema-from-json + {"dh:columns" [{"@type" "dh:DimensionColumn" + "csvw:datatype" "string" + "csvw:name" "some dimension!"} + {"@type" "dh:MeasureColumn" + "csvw:datatype" "int" + "csvw:name" "some measure!"}]})) + + +(def CSV (.getBytes "some measure!,some dimension! +1,a +2,b +3,c")) + +(def CSV2 (.getBytes "some measure!,some dimension! +4,d +1,a")) ;; -> after retraction: ((b, ...), (c, ...) ) + +(def CSV3 (.getBytes "some measure!,some dimension! +44,d +1,a +33,c")) ;; -> (2, 'b'), (33, 'c') + +(def expected-datasets + (let [D (fn [v] [(keyword "some dimension!") v]) + M (fn [v] [(keyword "some measure!") v]) + R (fn [m d] (into {} [(M m) (D d)]))] + {:materialised-1 [(R 1 "a") (R 2 "b") (R 3 "c")] + :materialised-2 [(R 1 "a") (R 2 "b") (R 3 "c") (R 4 "d")] + :materialised-3 [(R 2 "b") (R 3 "c")] + :materialised-4 [(R 2 "b") (R 33 "c")]})) + +(defrecord MapResultSetBuilder [^ResultSet rs rsmeta cols] + jdbc.rs/RowBuilder + (->row [this] (transient {})) + (column-count [this] (count cols)) + (with-column [this row i] + (jdbc.rs/with-column-value this row (nth cols (dec i)) + (jdbc.rs/read-column-by-index (.getObject rs ^Integer i) rsmeta i))) + (with-column-value [this row col v] + (assoc! row (string/replace-first (name col) #"^(dim::|m::|attr::)(.*)" "$2") v)) + (row! [this row] (persistent! row)) + jdbc.rs/ResultSetBuilder + (->rs [this] (transient [])) + (with-row [this mrs row] + (conj! mrs row)) + (rs! [this mrs] (persistent! mrs))) + +(defn commit + [conn {:keys [^java.net.URI release-uri] :as ostore} commit-uri kind input-stream] + (let [insert-req (store.sql/make-observation-insert-request! release-uri input-stream kind) + insert-req (assoc insert-req :commit-uri commit-uri) + {import-uid :data-key status :status} (store.sql/execute-insert-request ostore insert-req)] + (when (= status :import.status/success) + (store.sql/complete-import ostore insert-req)) + (log/debugf "commit: commit-path=%s status=%s" (.getPath commit-uri) status) + (assert (store.sql/create-commit? status)) + (store.sql/create-commit ostore insert-req))) + +(deftest from-zero-to-snapshot + (let [{{:keys [data-source db-executor store-factory]} :tpximpact.datahost.ldapi.test/sql-db} @*system*] + (with-open [conn (make-test-db data-source)] + (assert (store.sql/sqlite-table-exist? conn "imports")) + (is (m.release/db-ok? conn) "The `imports` table should exist.") + (jdbc/with-transaction [tx conn] + (let [conn tx + release-uri (java.net.URI. "http://localhost/data/series1/release/REL1") + parent-commit nil + commit-uri (java.net.URI. "http://localhost/data/series1/release/REL1/revision/1/commit/1") + input-data (java.io.ByteArrayInputStream. CSV) + _ (m.release/create-release-tables conn {:row-schema row-schema :release-uri release-uri} {:quoting :ansi}) + + ;;conn' conn + ;;conn (jdbc/with-logging conn (fn [s v] (log/debug "SQL::" s "\n" v))) + + insert-req (store.sql/make-observation-insert-request! release-uri input-data :dh/ChangeKindAppend) + insert-req (assoc insert-req :commit-uri commit-uri) + ostore (store.sql/make-observation-store conn db-executor release-uri row-schema) + {import-uid :data-key status :status} (store.sql/execute-insert-request ostore insert-req) + _ (is (= :import.status/success status)) + imports (m.release/select-imported-observations conn {:import-uid import-uid} {:store ostore}) + _ (store.sql/complete-import ostore insert-req) + _ (store.sql/create-commit ostore insert-req) + + observations (m.release/select-observations conn {:import-uid import-uid} {:quoting :ansi :store ostore}) + imports' (m.release/select-imported-observations conn {:import-uid import-uid} {:store ostore}) + _ (is (empty? imports') "The imports::... table should be empty after completed import") + commit-ids (m.release/get-commit-ids conn {:commit-uri commit-uri :release-uri release-uri}) + + _ (is (= 1 (count (jdbc/execute! conn ["select * from commits"])))) + _ (store.sql/replay-commits conn {:store ostore + :snapshot-table "snapshot_1" + :commit-uri commit-uri}) + + commit-uri-2 (java.net.URI. "http://localhost/data/series1/release/REL1/revision/1/commit/2") + _ (commit conn ostore commit-uri-2 :dh/ChangeKindAppend (java.io.ByteArrayInputStream. CSV2)) + _ (is (= 2 (count (jdbc/execute! conn ["select * from commits"])))) + + commit-uri-3 (java.net.URI. "http://localhost/data/series1/release/REL1/revision/1/commit/3") + _ (commit conn ostore commit-uri-3 :dh/ChangeKindRetract (java.io.ByteArrayInputStream. CSV2)) + + commit-uri-4 (java.net.URI. "http://localhost/data/series1/release/REL1/revision/1/commit/4") + _ (commit conn ostore commit-uri-4 :dh/ChangeKindCorrect (java.io.ByteArrayInputStream. CSV3)) + + materialise-opts {:store ostore :builder-fn as-unqualified-maps}] + + ;; materialise the dataset snapshots to different tables + (store.sql/replay-commits conn {:store ostore :commit-uri commit-uri :snapshot-table "snapshot_1_again"}) + (store.sql/replay-commits conn {:store ostore :commit-uri commit-uri-2 :snapshot-table "snapshot_2"}) + (store.sql/replay-commits conn {:store ostore :commit-uri commit-uri-3 :snapshot-table "snapshot_3"}) + (store.sql/replay-commits conn {:store ostore :commit-uri commit-uri-4 :snapshot-table "snapshot_4"}) + + (let [m2 (m.release/materialize-snapshot conn + {:release-uri release-uri + :snapshot-table "snapshot_2"} + materialise-opts) + m3 (m.release/materialize-snapshot conn + {:release-uri release-uri + :snapshot-table "snapshot_3"} + materialise-opts) + m4 (m.release/materialize-snapshot conn + {:release-uri release-uri + :snapshot-table "snapshot_4"} + materialise-opts)] + (is (= (:materialised-2 expected-datasets) m2)) + (is (= (:materialised-3 expected-datasets) m3)) + (is (= (:materialised-4 expected-datasets) m4))) + + (is (not (empty? imports))) + (is (not (empty? observations))) + (is (empty? imports'))))))) + diff --git a/datahost-ld-openapi/test/tpximpact/datahost/ldapi/store/temp_file_store.clj b/datahost-ld-openapi/test/tpximpact/datahost/ldapi/store/temp_file_store.clj index 87f7d1bd..c0a60886 100644 --- a/datahost-ld-openapi/test/tpximpact/datahost/ldapi/store/temp_file_store.clj +++ b/datahost-ld-openapi/test/tpximpact/datahost/ldapi/store/temp_file_store.clj @@ -7,8 +7,6 @@ (defrecord TempFileStore [file-store] store/ChangeStore - (-insert-data [_this file] - (store/insert-data file-store file)) (-insert-data-with-request [_this request] (store/-insert-data-with-request file-store request)) (-get-data [_this data-key] diff --git a/datahost-ld-openapi/test/tpximpact/test_helpers.clj b/datahost-ld-openapi/test/tpximpact/test_helpers.clj index 897aa1c8..38e96f39 100644 --- a/datahost-ld-openapi/test/tpximpact/test_helpers.clj +++ b/datahost-ld-openapi/test/tpximpact/test_helpers.clj @@ -1,6 +1,7 @@ (ns tpximpact.test-helpers (:require [clojure.test :refer :all] [integrant.core :as ig] + [next.jdbc :as jdbc] [tpximpact.datahost.system-uris :as su] [tpximpact.db-cleaner :as dc] [tpximpact.datahost.sys :as sys] @@ -11,6 +12,30 @@ (defmethod ig/init-key :tpximpact.datahost.ldapi.test/http-client [_ config] (http-client/make-client config)) +(defmethod ig/init-key :tpximpact.datahost.ldapi.test/sql-db [_ m] + m) + +(defmethod ig/init-key :tpximpact.datahost.ldapi.test/sqlite-connection [_ {{:keys [spec user password]} :db-config}] + (jdbc/get-connection spec user password)) + +(defmethod ig/halt-key! :tpximpact.datahost.ldapi.test/sqlite-connection [_ connection] + (.close connection)) + +(defmethod ig/init-key :tpximpact.datahost.ldapi.test/data-source [_ {:keys [connection]}] + (reify javax.sql.DataSource + (getConnection [this] + connection))) + +(defmethod ig/halt-key! :tpximpact.datahost.ldapi.test/data-source [_ _]) + +(defmethod ig/halt-key! :tpximpact.datahost.ldapi.test/sql-db [_ {{:keys [spec user password]} :db-config conn :connection}] + (with-open [conn (jdbc/get-connection spec user password)] + (try + ;;(.execute (.createStatement conn) "SHUTDOWN") + (catch RuntimeException ex + (when-not (isa? (type (ex-cause ex)) java.sql.SQLException) + (throw ex)))))) + (defn start-system [configs] (-> configs (sys/load-configs) diff --git a/datahost-ld-openapi/tests.edn b/datahost-ld-openapi/tests.edn index a9176ab9..6893044f 100644 --- a/datahost-ld-openapi/tests.edn +++ b/datahost-ld-openapi/tests.edn @@ -5,7 +5,10 @@ :skip-meta [:hurl]} {:id :integration :ns-patterns [".*-test"] - :focus-meta [:hurl]}] + :focus-meta [:hurl]} + {:id :sql + :ns-patterns [".*-test"] + :focus-meta [:sql]}] :capture-output? false :reporter [kaocha.report/documentation] ; :plugins [:kaocha.plugin/junit-xml]