diff --git a/.github/integration/scripts/make_sda_credentials.sh b/.github/integration/scripts/make_sda_credentials.sh index 5f0bd3f0a..fb1908a83 100644 --- a/.github/integration/scripts/make_sda_credentials.sh +++ b/.github/integration/scripts/make_sda_credentials.sh @@ -17,6 +17,7 @@ pip install aiohttp Authlib joserfc requests > /dev/null for n in download finalize inbox ingest mapper sync verify; do echo "creating credentials for: $n" psql -U postgres -h postgres -d sda -c "ALTER ROLE $n LOGIN PASSWORD '$n';" + psql -U postgres -h postgres -d sda -c "GRANT base TO $n;" ## password and permissions for MQ body_data=$(jq -n -c --arg password "$n" --arg tags none '$ARGS.named') @@ -60,6 +61,11 @@ if [ ! -f "/shared/c4gh.sec.pem" ]; then curl -s -L https://github.com/neicnordic/crypt4gh/releases/download/v1.7.4/crypt4gh_linux_x86_64.tar.gz | tar -xz -C /shared/ && chmod +x /shared/crypt4gh /shared/crypt4gh generate -n /shared/c4gh -p c4ghpass fi +if [ ! -f "/shared/sync.sec.pem" ]; then + echo "creating crypth4gh key" + curl -s -L https://github.com/neicnordic/crypt4gh/releases/download/v1.7.4/crypt4gh_linux_x86_64.tar.gz | tar -xz -C /shared/ && chmod +x /shared/crypt4gh + /shared/crypt4gh generate -n /shared/sync -p syncPass +fi if [ ! -f "/shared/keys/ssh" ]; then ssh-keygen -o -a 256 -t ed25519 -f /shared/keys/ssh -N "" diff --git a/.github/integration/sda-s3-integration.yml b/.github/integration/sda-s3-integration.yml index 070e70859..ffffd50b6 100644 --- a/.github/integration/sda-s3-integration.yml +++ b/.github/integration/sda-s3-integration.yml @@ -208,6 +208,49 @@ services: - ./sda/config.yaml:/config.yaml - shared:/shared + sync: + image: ghcr.io/neicnordic/sensitive-data-archive:PR${PR_NUMBER} + command: [ sda-sync ] + container_name: sync + depends_on: + credentials: + condition: service_completed_successfully + minio: + condition: service_healthy + postgres: + condition: service_healthy + rabbitmq: + condition: service_healthy + environment: + - BROKER_PASSWORD=sync + - BROKER_USER=sync + - BROKER_QUEUE=mapping_stream + - DB_PASSWORD=sync + - DB_USER=sync + restart: always + volumes: + - ./sda/config.yaml:/config.yaml + - shared:/shared + + sync-api: + image: ghcr.io/neicnordic/sensitive-data-archive:PR${PR_NUMBER} + command: [ sda-syncapi ] + container_name: sync-api + depends_on: + credentials: + condition: service_completed_successfully + rabbitmq: + condition: service_healthy + environment: + - BROKER_PASSWORD=sync + - BROKER_USER=sync + - BROKER_EXCHANGE=sda.dead + ports: + - "18080:8080" + restart: always + volumes: + - ./sda/config.yaml:/config.yaml + oidc: container_name: oidc command: @@ -250,6 +293,10 @@ services: condition: service_started s3inbox: condition: service_started + sync: + condition: service_started + sync-api: + condition: service_started verify: condition: service_started environment: diff --git a/.github/integration/sda/config.yaml b/.github/integration/sda/config.yaml index 15e949f6e..9b44e5d75 100644 --- a/.github/integration/sda/config.yaml +++ b/.github/integration/sda/config.yaml @@ -50,6 +50,7 @@ db: c4gh: filePath: /shared/c4gh.sec.pem passphrase: "c4ghpass" + syncPubKeyPath: /shared/sync.pub.pem server: cert: "" @@ -57,4 +58,24 @@ server: jwtpubkeypath: "/shared/keys/pub/" jwtpubkeyurl: "http://oidc:8080/jwk" +sync: + api: + password: "pass" + user: "user" + centerPrefix: "SYNC" + destination: + type: "s3" + url: "http://s3" + port: 9000 + readypath: "/minio/health/ready" + accessKey: "access" + secretKey: "secretKey" + bucket: "sync" + region: "us-east-1" + remote: + host: "http://sync-api" + port: "8080" + password: "pass" + user: "user" +schema.type: "isolated" \ No newline at end of file diff --git a/.github/integration/tests/sda/10_upload_test.sh b/.github/integration/tests/sda/10_upload_test.sh index bfffa2201..49990f7bf 100644 --- a/.github/integration/tests/sda/10_upload_test.sh +++ b/.github/integration/tests/sda/10_upload_test.sh @@ -34,7 +34,7 @@ done psql -U postgres -h postgres -d sda -At -c "TRUNCATE TABLE sda.files CASCADE;" if [ "$STORAGETYPE" = "posix" ]; then - for file in NA12878.bam NA12878_20k_b37.bam; do + for file in NA12878.bam NA12878_20k_b37.bam NA12878.bai NA12878_20k_b37.bai; do echo "downloading $file" curl -s -L -o /shared/$file "https://github.com/ga4gh/htsget-refserver/raw/main/data/gcp/gatk-test-data/wgs_bam/$file" if [ ! -f "$file.c4gh" ]; then @@ -70,7 +70,7 @@ fi if [ "$STORAGETYPE" = "s3" ]; then pip -q install s3cmd - for file in NA12878.bam NA12878_20k_b37.bam; do + for file in NA12878.bam NA12878_20k_b37.bam NA12878.bai NA12878_20k_b37.bai; do curl -s -L -o /shared/$file "https://github.com/ga4gh/htsget-refserver/raw/main/data/gcp/gatk-test-data/wgs_bam/$file" if [ ! -f "$file.c4gh" ]; then yes | /shared/crypt4gh encrypt -p c4gh.pub.pem -f "$file" @@ -87,7 +87,7 @@ fi echo "waiting for upload to complete" RETRY_TIMES=0 -until [ "$(curl -s -k -u guest:guest $URI/api/queues/sda/inbox | jq -r '."messages_ready"')" -eq 4 ]; do +until [ "$(curl -s -k -u guest:guest $URI/api/queues/sda/inbox | jq -r '."messages_ready"')" -eq 6 ]; do echo "waiting for upload to complete" RETRY_TIMES=$((RETRY_TIMES + 1)) if [ "$RETRY_TIMES" -eq 30 ]; then @@ -99,14 +99,14 @@ done if [ "$STORAGETYPE" = "s3" ]; then num_rows=$(psql -U postgres -h postgres -d sda -At -c "SELECT COUNT(*) from sda.files;") - if [ "$num_rows" -ne 3 ]; then - echo "database queries for register_files failed, expected 3 got $num_rows" + if [ "$num_rows" -ne 5 ]; then + echo "database queries for register_files failed, expected 5 got $num_rows" exit 1 fi num_log_rows=$(psql -U postgres -h postgres -d sda -At -c "SELECT COUNT(*) from sda.file_event_log;") - if [ "$num_log_rows" -ne 8 ]; then - echo "database queries for file_event_logs failed, expected 8 got $num_log_rows" + if [ "$num_log_rows" -ne 12 ]; then + echo "database queries for file_event_logs failed, expected 12 got $num_log_rows" exit 1 fi @@ -120,7 +120,7 @@ if [ "$STORAGETYPE" = "s3" ]; then ## verify that messages exists in MQ echo "waiting for upload to complete" RETRY_TIMES=0 - until [ "$(curl -s -k -u guest:guest $URI/api/queues/sda/inbox | jq -r '."messages_ready"')" -eq 5 ]; do + until [ "$(curl -s -k -u guest:guest $URI/api/queues/sda/inbox | jq -r '."messages_ready"')" -eq 7 ]; do echo "waiting for upload to complete" RETRY_TIMES=$((RETRY_TIMES + 1)) if [ "$RETRY_TIMES" -eq 30 ]; then @@ -131,14 +131,14 @@ if [ "$STORAGETYPE" = "s3" ]; then done num_rows=$(psql -U postgres -h postgres -d sda -At -c "SELECT COUNT(*) from sda.files;") - if [ "$num_rows" -ne 4 ]; then - echo "database queries for register_files failed, expected 4 got $num_rows" + if [ "$num_rows" -ne 6 ]; then + echo "database queries for register_files failed, expected 6 got $num_rows" exit 1 fi num_log_rows=$(psql -U postgres -h postgres -d sda -At -c "SELECT COUNT(*) from sda.file_event_log;") - if [ "$num_log_rows" -ne 10 ]; then - echo "database queries for file_event_logs failed, expected 10 got $num_log_rows" + if [ "$num_log_rows" -ne 14 ]; then + echo "database queries for file_event_logs failed, expected 14 got $num_log_rows" exit 1 fi fi diff --git a/.github/integration/tests/sda/20_ingest-verify_test.sh b/.github/integration/tests/sda/20_ingest-verify_test.sh index 2935a817e..a76aaf051 100644 --- a/.github/integration/tests/sda/20_ingest-verify_test.sh +++ b/.github/integration/tests/sda/20_ingest-verify_test.sh @@ -3,7 +3,7 @@ set -e cd shared || true -for file in NA12878.bam NA12878_20k_b37.bam; do +for file in NA12878.bam NA12878_20k_b37.bam NA12878.bai NA12878_20k_b37.bai; do ENC_SHA=$(sha256sum "$file.c4gh" | cut -d' ' -f 1) ENC_MD5=$(md5sum "$file.c4gh" | cut -d' ' -f 1) @@ -59,7 +59,7 @@ done echo "waiting for verify to complete" RETRY_TIMES=0 -until [ "$(curl -su guest:guest http://rabbitmq:15672/api/queues/sda/verified/ | jq -r '.messages_ready')" -eq 2 ]; do +until [ "$(curl -su guest:guest http://rabbitmq:15672/api/queues/sda/verified/ | jq -r '.messages_ready')" -eq 4 ]; do echo "waiting for verify to complete" RETRY_TIMES=$((RETRY_TIMES + 1)) if [ "$RETRY_TIMES" -eq 30 ]; then diff --git a/.github/integration/tests/sda/21_cancel_test.sh b/.github/integration/tests/sda/21_cancel_test.sh index a76ab10a0..87f43fa62 100644 --- a/.github/integration/tests/sda/21_cancel_test.sh +++ b/.github/integration/tests/sda/21_cancel_test.sh @@ -86,11 +86,11 @@ curl -k -u guest:guest "http://rabbitmq:15672/api/exchanges/sda/sda/publish" \ -d "$ingest_body" RETRY_TIMES=0 -until [ "$(curl -su guest:guest http://rabbitmq:15672/api/queues/sda/verified/ | jq -r '.messages_ready')" -eq 3 ]; do - echo "waiting for verify to complete" +until [ "$(curl -su guest:guest http://rabbitmq:15672/api/queues/sda/verified/ | jq -r '.messages_ready')" -eq 5 ]; do + echo "waiting for verify to complete after re-ingestion" RETRY_TIMES=$((RETRY_TIMES + 1)) if [ "$RETRY_TIMES" -eq 30 ]; then - echo "::error::Time out while waiting for verify to complete" + echo "::error::Time out while waiting for verify to complete after re-ingestion" exit 1 fi sleep 2 diff --git a/.github/integration/tests/sda/30_backup-finalize_test.sh b/.github/integration/tests/sda/30_backup-finalize_test.sh index 9e2c30087..afb0cef56 100644 --- a/.github/integration/tests/sda/30_backup-finalize_test.sh +++ b/.github/integration/tests/sda/30_backup-finalize_test.sh @@ -4,7 +4,7 @@ set -e cd shared || true i=1 -while [ $i -le 2 ]; do +while [ $i -le 4 ]; do ## get correlation id from upload message MSG=$( curl -s -X POST \ @@ -28,12 +28,17 @@ while [ $i -le 2 ]; do '$ARGS.named' ) + accession_id=EGAF7490000000$i + if [[ "$filepath" == *.bai.c4gh ]]; then + accession_id="SYNC-123-0000$i" + fi + accession_payload=$( jq -r -c -n \ --arg type accession \ --arg user "$user" \ --arg filepath "$filepath" \ - --arg accession_id "EGAF7490000000$i" \ + --arg accession_id "$accession_id" \ --argjson decrypted_checksums "$decrypted_checksums" \ '$ARGS.named|@base64' ) @@ -58,7 +63,7 @@ done echo "waiting for finalize to complete" -until [ "$(curl -su guest:guest http://rabbitmq:15672/api/queues/sda/completed/ | jq -r '.messages_ready')" -eq 2 ]; do +until [ "$(curl -su guest:guest http://rabbitmq:15672/api/queues/sda/completed/ | jq -r '.messages_ready')" -eq 4 ]; do echo "waiting for finalize to complete" RETRY_TIMES=$((RETRY_TIMES + 1)) if [ "$RETRY_TIMES" -eq 30 ]; then @@ -88,7 +93,7 @@ socket_timeout = 30 EOD # check DB for archive file names -for file in NA12878.bam.c4gh NA12878_20k_b37.bam.c4gh; do +for file in NA12878.bam.c4gh NA12878.bai.c4gh NA12878_20k_b37.bam.c4gh NA12878_20k_b37.bai.c4gh; do archiveName=$(psql -U postgres -h postgres -d sda -At -c "SELECT archive_file_path from sda.files where submission_file_path = 'test_dummy.org/$file';") size=$(s3cmd -c direct ls s3://backup/"$archiveName" | tr -s ' ' | cut -d ' ' -f 3) if [ "$size" -eq 0 ]; then diff --git a/.github/integration/tests/sda/40_mapper_test.sh b/.github/integration/tests/sda/40_mapper_test.sh index 7de50977f..dc5234b6a 100644 --- a/.github/integration/tests/sda/40_mapper_test.sh +++ b/.github/integration/tests/sda/40_mapper_test.sh @@ -44,7 +44,7 @@ curl -s -u guest:guest "http://rabbitmq:15672/api/exchanges/sda/sda/publish" \ # check DB for dataset contents RETRY_TIMES=0 -until [ "$(psql -U postgres -h postgres -d sda -At -c "select count(id) from sda.file_dataset where dataset_id = (select id from sda.datasets where stable_id = 'EGAD74900000101')")" -eq 2 ]; do +until [ "$(psql -U postgres -h postgres -d sda -At -c "select count(id) from sda.file_dataset where dataset_id = (select id from sda.datasets where stable_id = 'EGAD74900000101');")" -eq 2 ]; do echo "waiting for mapper to complete" RETRY_TIMES=$((RETRY_TIMES + 1)) if [ "$RETRY_TIMES" -eq 30 ]; then @@ -63,7 +63,7 @@ for file in NA12878.bam.c4gh NA12878_20k_b37.bam.c4gh; do fi done -until [ "$(psql -U postgres -h postgres -d sda -At -c "select event from sda.file_event_log where file_id = (select id from sda.files where stable_id = 'EGAF74900000002') order by started_at DESC LIMIT 1")" = "ready" ]; do +until [ "$(psql -U postgres -h postgres -d sda -At -c "select event from sda.file_event_log where file_id = (select id from sda.files where stable_id = 'EGAF74900000002') order by started_at DESC LIMIT 1;")" = "ready" ]; do echo "waiting for files be ready" RETRY_TIMES=$((RETRY_TIMES + 1)) if [ "$RETRY_TIMES" -eq 30 ]; then @@ -73,7 +73,7 @@ until [ "$(psql -U postgres -h postgres -d sda -At -c "select event from sda.fil sleep 2 done -until [ "$(psql -U postgres -h postgres -d sda -At -c "select event from sda.dataset_event_log where dataset_id = 'EGAD74900000101' order by event_date DESC LIMIT 1")" = "registered" ]; do +until [ "$(psql -U postgres -h postgres -d sda -At -c "select event from sda.dataset_event_log where dataset_id = 'EGAD74900000101' order by event_date DESC LIMIT 1;")" = "registered" ]; do echo "waiting for dataset be registered" RETRY_TIMES=$((RETRY_TIMES + 1)) if [ "$RETRY_TIMES" -eq 30 ]; then @@ -108,7 +108,7 @@ curl -s -u guest:guest "http://rabbitmq:15672/api/exchanges/sda/sda/publish" \ -H 'Content-Type: application/json;charset=UTF-8' \ -d "$release_body" -until [ "$(psql -U postgres -h postgres -d sda -At -c "select event from sda.dataset_event_log where dataset_id = 'EGAD74900000101' order by event_date DESC LIMIT 1")" = "released" ]; do +until [ "$(psql -U postgres -h postgres -d sda -At -c "select event from sda.dataset_event_log where dataset_id = 'EGAD74900000101' order by event_date DESC LIMIT 1;")" = "released" ]; do echo "waiting for dataset be released" RETRY_TIMES=$((RETRY_TIMES + 1)) if [ "$RETRY_TIMES" -eq 30 ]; then @@ -153,4 +153,46 @@ until [ "$(psql -U postgres -h postgres -d sda -At -c "select event from sda.dat sleep 2 done -echo "dataset deprecated successfully" \ No newline at end of file +echo "dataset deprecated successfully" + +mappings=$( + jq -c -n \ + '$ARGS.positional' \ + --args "SYNC-123-00003" \ + --args "SYNC-123-00004" +) + +mapping_payload=$( + jq -r -c -n \ + --arg type mapping \ + --arg dataset_id SYNC-001-12345 \ + --argjson accession_ids "$mappings" \ + '$ARGS.named|@base64' +) + +mapping_body=$( + jq -c -n \ + --arg vhost test \ + --arg name sda \ + --argjson properties "$properties" \ + --arg routing_key "mappings" \ + --arg payload_encoding base64 \ + --arg payload "$mapping_payload" \ + '$ARGS.named' +) + +curl -s -u guest:guest "http://rabbitmq:15672/api/exchanges/sda/sda/publish" \ + -H 'Content-Type: application/json;charset=UTF-8' \ + -d "$mapping_body" + +# check DB for dataset contents +RETRY_TIMES=0 +until [ "$(psql -U postgres -h postgres -d sda -At -c "select count(id) from sda.file_dataset where dataset_id = (select id from sda.datasets where stable_id = 'SYNC-001-12345')")" -eq 2 ]; do + echo "waiting for mapper to complete" + RETRY_TIMES=$((RETRY_TIMES + 1)) + if [ "$RETRY_TIMES" -eq 30 ]; then + echo "::error::Time out while waiting for dataset to be mapped" + exit 1 + fi + sleep 2 +done \ No newline at end of file diff --git a/.github/integration/tests/sda/45_sync_test.sh b/.github/integration/tests/sda/45_sync_test.sh new file mode 100644 index 000000000..f90a5d014 --- /dev/null +++ b/.github/integration/tests/sda/45_sync_test.sh @@ -0,0 +1,35 @@ +#!/bin/bash +set -e + +cd shared || true + +if [ "$STORAGETYPE" = "posix" ]; then + exit 0 +fi + +# check bucket for synced files +for file in NA12878.bai NA12878_20k_b37.bai; do + RETRY_TIMES=0 + until [ "$(s3cmd -c direct ls s3://sync/test_dummy.org/"$file")" != "" ]; do + RETRY_TIMES=$((RETRY_TIMES + 1)) + if [ "$RETRY_TIMES" -eq 30 ]; then + echo "::error::Time out while waiting for files to be synced" + exit 1 + fi + sleep 2 + done +done + +echo "files synced successfully" + +echo "waiting for sync-api to send messages" +RETRY_TIMES=0 +until [ "$(curl -su guest:guest http://rabbitmq:15672/api/queues/sda/catch_all.dead/ | jq -r '.messages_ready')" -eq 5 ]; do + echo "waiting for sync-api to send messages" + RETRY_TIMES=$((RETRY_TIMES + 1)) + if [ "$RETRY_TIMES" -eq 30 ]; then + echo "::error::Time out while waiting for sync-api to send messages" + exit 1 + fi + sleep 2 +done \ No newline at end of file diff --git a/sda/cmd/finalize/finalize.go b/sda/cmd/finalize/finalize.go index 7028c6f30..6ab14c25f 100644 --- a/sda/cmd/finalize/finalize.go +++ b/sda/cmd/finalize/finalize.go @@ -90,6 +90,11 @@ func main() { status, err := db.GetFileStatus(delivered.CorrelationId) if err != nil { log.Errorf("failed to get file status, reason: %v", err) + if err := delivered.Nack(false, true); err != nil { + log.Errorf("failed to Nack message, reason: (%v)", err) + } + + continue } if status == "disabled" { log.Infof("file with correlation ID: %s is disabled, stopping work", delivered.CorrelationId) diff --git a/sda/cmd/sync/sync.go b/sda/cmd/sync/sync.go new file mode 100644 index 000000000..4f120a018 --- /dev/null +++ b/sda/cmd/sync/sync.go @@ -0,0 +1,292 @@ +// The backup command accepts messages with accessionIDs for +// ingested files and copies them to the second storage. +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" + "time" + + "github.com/neicnordic/crypt4gh/model/headers" + "github.com/neicnordic/sensitive-data-archive/internal/broker" + "github.com/neicnordic/sensitive-data-archive/internal/config" + "github.com/neicnordic/sensitive-data-archive/internal/database" + "github.com/neicnordic/sensitive-data-archive/internal/schema" + "github.com/neicnordic/sensitive-data-archive/internal/storage" + log "github.com/sirupsen/logrus" + "golang.org/x/crypto/chacha20poly1305" +) + +var ( + err error + key, publicKey *[32]byte + db *database.SDAdb + conf *config.Config + archive, syncDestination storage.Backend +) + +func main() { + forever := make(chan bool) + conf, err = config.NewConfig("sync") + if err != nil { + log.Fatal(err) + } + mq, err := broker.NewMQ(conf.Broker) + if err != nil { + log.Fatal(err) + } + db, err = database.NewSDAdb(conf.Database) + if err != nil { + log.Fatal(err) + } + + syncDestination, err = storage.NewBackend(conf.Sync.Destination) + if err != nil { + log.Fatal(err) + } + archive, err = storage.NewBackend(conf.Archive) + if err != nil { + log.Fatal(err) + } + + key, err = config.GetC4GHKey() + if err != nil { + log.Fatal(err) + } + + publicKey, err = config.GetC4GHPublicKey() + if err != nil { + log.Fatal(err) + } + + defer mq.Channel.Close() + defer mq.Connection.Close() + defer db.Close() + + go func() { + connError := mq.ConnectionWatcher() + log.Error(connError) + forever <- false + }() + + go func() { + connError := mq.ChannelWatcher() + log.Error(connError) + forever <- false + }() + + log.Info("Starting sync service") + var message schema.DatasetMapping + + go func() { + messages, err := mq.GetMessages(conf.Broker.Queue) + if err != nil { + log.Fatal(err) + } + for delivered := range messages { + log.Debugf("Received a message (corr-id: %s, message: %s)", + delivered.CorrelationId, + delivered.Body) + + err := schema.ValidateJSON(fmt.Sprintf("%s/dataset-mapping.json", conf.Broker.SchemasPath), delivered.Body) + if err != nil { + log.Errorf("validation of incoming message (dataset-mapping) failed, reason: (%s)", err.Error()) + // Send the message to an error queue so it can be analyzed. + infoErrorMessage := broker.InfoError{ + Error: "Message validation failed in sync service", + Reason: err.Error(), + OriginalMessage: string(delivered.Body), + } + + body, _ := json.Marshal(infoErrorMessage) + if err := mq.SendMessage(delivered.CorrelationId, conf.Broker.Exchange, "error", body); err != nil { + log.Errorf("failed to publish message, reason: (%s)", err.Error()) + } + if err := delivered.Ack(false); err != nil { + log.Errorf("failed to Ack message, reason: (%s)", err.Error()) + } + + continue + } + + // we unmarshal the message in the validation step so this is safe to do + _ = json.Unmarshal(delivered.Body, &message) + + if !strings.HasPrefix(message.DatasetID, conf.Sync.CenterPrefix) { + log.Infoln("external dataset") + if err := delivered.Ack(false); err != nil { + log.Errorf("failed to Ack message, reason: (%s)", err.Error()) + } + + continue + } + + for _, aID := range message.AccessionIDs { + if err := syncFiles(aID); err != nil { + log.Errorf("failed to sync archived file %s, reason: (%s)", aID, err.Error()) + if err := delivered.Nack(false, false); err != nil { + log.Errorf("failed to nack following GetFileSize error message") + } + + continue + } + } + + log.Infoln("buildSyncDatasetJSON") + blob, err := buildSyncDatasetJSON(delivered.Body) + if err != nil { + log.Errorf("failed to build SyncDatasetJSON, Reason: %v", err) + } + if err := sendPOST(blob); err != nil { + log.Errorf("failed to send POST, Reason: %v", err) + if err := delivered.Nack(false, false); err != nil { + log.Errorf("failed to nack following sendPOST error message") + } + + continue + } + + if err := delivered.Ack(false); err != nil { + log.Errorf("failed to Ack message, reason: (%s)", err.Error()) + } + } + }() + + <-forever +} + +func syncFiles(stableID string) error { + log.Debugf("syncing file %s", stableID) + inboxPath, err := db.GetInboxPath(stableID) + if err != nil { + return fmt.Errorf("failed to get inbox path for file with stable ID: %s", stableID) + } + + archivePath, err := db.GetArchivePath(stableID) + if err != nil { + return fmt.Errorf("failed to get archive path for file with stable ID: %s", stableID) + } + + fileSize, err := archive.GetFileSize(archivePath) + if err != nil { + return err + } + + file, err := archive.NewFileReader(archivePath) + if err != nil { + return err + } + defer file.Close() + + dest, err := syncDestination.NewFileWriter(inboxPath) + if err != nil { + return err + } + defer dest.Close() + + header, err := db.GetHeaderForStableID(stableID) + if err != nil { + return err + } + + pubkeyList := [][chacha20poly1305.KeySize]byte{} + pubkeyList = append(pubkeyList, *publicKey) + newHeader, err := headers.ReEncryptHeader(header, *key, pubkeyList) + if err != nil { + return err + } + + _, err = dest.Write(newHeader) + if err != nil { + return err + } + + // Copy the file and check is sizes match + copiedSize, err := io.Copy(dest, file) + if err != nil || copiedSize != int64(fileSize) { + switch { + case copiedSize != int64(fileSize): + return fmt.Errorf("copied size does not match file size") + default: + return err + } + } + + return nil +} + +func buildSyncDatasetJSON(b []byte) ([]byte, error) { + var msg schema.DatasetMapping + _ = json.Unmarshal(b, &msg) + + var dataset = schema.SyncDataset{ + DatasetID: msg.DatasetID, + } + + for _, ID := range msg.AccessionIDs { + data, err := db.GetSyncData(ID) + if err != nil { + return nil, err + } + datasetFile := schema.DatasetFiles{ + FilePath: data.FilePath, + FileID: ID, + ShaSum: data.Checksum, + } + dataset.DatasetFiles = append(dataset.DatasetFiles, datasetFile) + dataset.User = data.User + } + + json, err := json.Marshal(dataset) + if err != nil { + return nil, err + } + + return json, nil +} + +func sendPOST(payload []byte) error { + client := &http.Client{ + Timeout: 30 * time.Second, + } + + URL, err := createHostURL(conf.Sync.RemoteHost, conf.Sync.RemotePort) + if err != nil { + return err + } + + req, err := http.NewRequest(http.MethodPost, URL, bytes.NewBuffer(payload)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.SetBasicAuth(conf.Sync.RemoteUser, conf.Sync.RemotePassword) + resp, err := client.Do(req) + if err != nil { + return err + } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("%s", resp.Status) + } + defer resp.Body.Close() + + return nil +} + +func createHostURL(host string, port int) (string, error) { + url, err := url.ParseRequestURI(host) + if err != nil { + return "", err + } + if url.Port() == "" && port != 0 { + url.Host += fmt.Sprintf(":%d", port) + } + url.Path = "/dataset" + + return url.String(), nil +} diff --git a/sda/cmd/sync/sync.md b/sda/cmd/sync/sync.md new file mode 100644 index 000000000..84edf4f2c --- /dev/null +++ b/sda/cmd/sync/sync.md @@ -0,0 +1,148 @@ +# Sync + +The sync service is used in the [Bigpicture](https://bigpicture.eu/) project. + +Copies files from the archive to the sync destination, including the header so that the files can be ingested at the remote site. + +## Configuration + +There are a number of options that can be set for the sync service. +These settings can be set by mounting a yaml-file at `/config.yaml` with settings. + +ex. + +```yaml +log: + level: "debug" + format: "json" +``` + +They may also be set using environment variables like: + +```bash +export LOG_LEVEL="debug" +export LOG_FORMAT="json" +``` + +### Service settings + +- `SYNC_CENTERPREFIX`: Prefix of the dataset ID to detect if the dataset was minted locally or not +- `SYNC_REMOTE_HOST`: URL to the remote API host +- `SYNC_REMOTE_POST`: Port for the remote API host, if other than the standard HTTP(S) ports +- `SYNC_REMOTE_USER`: Username for connecting to the remote API +- `SYNC_REMOTE_PASSWORD`: Password for the API user + +### Keyfile settings + +These settings control which crypt4gh keyfile is loaded. + +- `C4GH_FILEPATH`: path to the crypt4gh keyfile +- `C4GH_PASSPHRASE`: pass phrase to unlock the keyfile +- `C4GH_SYNCPUBKEYPATH`: path to the crypt4gh public key to use for reencrypting file headers. + +### RabbitMQ broker settings + +These settings control how sync connects to the RabbitMQ message broker. + +- `BROKER_HOST`: hostname of the rabbitmq server +- `BROKER_PORT`: rabbitmq broker port (commonly `5671` with TLS and `5672` without) +- `BROKER_QUEUE`: message queue or stream to read messages from (commonly `mapping_stream`) +- `BROKER_USER`: username to connect to rabbitmq +- `BROKER_PASSWORD`: password to connect to rabbitmq +- `BROKER_PREFETCHCOUNT`: Number of messages to pull from the message server at the time (default to 2) + +### PostgreSQL Database settings + +- `DB_HOST`: hostname for the postgresql database +- `DB_PORT`: database port (commonly 5432) +- `DB_USER`: username for the database +- `DB_PASSWORD`: password for the database +- `DB_DATABASE`: database name +- `DB_SSLMODE`: The TLS encryption policy to use for database connections. Valid options are: + - `disable` + - `allow` + - `prefer` + - `require` + - `verify-ca` + - `verify-full` + + More information is available [in the postgresql documentation](https://www.postgresql.org/docs/current/libpq-ssl.html#LIBPQ-SSL-PROTECTION) + +Note that if `DB_SSLMODE` is set to anything but `disable`, then `DB_CACERT` needs to be set, and if set to `verify-full`, then `DB_CLIENTCERT`, and `DB_CLIENTKEY` must also be set + +- `DB_CLIENTKEY`: key-file for the database client certificate +- `DB_CLIENTCERT`: database client certificate file +- `DB_CACERT`: Certificate Authority (CA) certificate for the database to use + +### Storage settings + +Storage backend is defined by the `ARCHIVE_TYPE`, and `SYNC_DESTINATION_TYPE` variables. +Valid values for these options are `S3` or `POSIX` for `ARCHIVE_TYPE` and `POSIX`, `S3` or `SFTP` for `SYNC_DESTINATION_TYPE`. + +The value of these variables define what other variables are read. +The same variables are available for all storage types, differing by prefix (`ARCHIVE_`, or `SYNC_DESTINATION_`) + +if `*_TYPE` is `S3` then the following variables are available: + +- `*_URL`: URL to the S3 system +- `*_ACCESSKEY`: The S3 access and secret key are used to authenticate to S3, [more info at AWS](https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys) +- `*_SECRETKEY`: The S3 access and secret key are used to authenticate to S3, [more info at AWS](https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys) +- `*_BUCKET`: The S3 bucket to use as the storage root +- `*_PORT`: S3 connection port (default: `443`) +- `*_REGION`: S3 region (default: `us-east-1`) +- `*_CHUNKSIZE`: S3 chunk size for multipart uploads. +- `*_CACERT`: Certificate Authority (CA) certificate for the storage system, CA certificate is only needed if the S3 server has a certificate signed by a private entity + +if `*_TYPE` is `POSIX`: + +- `*_LOCATION`: POSIX path to use as storage root + +and if `*_TYPE` is `SFTP`: + +- `*_HOST`: URL to the SFTP server +- `*_PORT`: Port of the SFTP server to connect to +- `*_USERNAME`: Username connectin to the SFTP server +- `*_HOSTKEY`: The SFTP server's public key +- `*_PEMKEYPATH`: Path to the ssh private key used to connect to the SFTP server +- `*_PEMKEYPASS`: Passphrase for the ssh private key + +### Logging settings + +- `LOG_FORMAT` can be set to “json” to get logs in json format. All other values result in text logging +- `LOG_LEVEL` can be set to one of the following, in increasing order of severity: + - `trace` + - `debug` + - `info` + - `warn` (or `warning`) + - `error` + - `fatal` + - `panic` + +## Service Description + +The sync service copies files from the archive storage to sync storage. + +When running, sync reads messages from the "mapping_stream" RabbitMQ queue. +For each message, these steps are taken (if not otherwise noted, errors halts progress, the message is Nack'ed, and the service moves on to the next message): + +1. The message is validated as valid JSON that matches the "dataset-mapping" schema. If the message can’t be validated it is sent to the error queue for later analysis. +2. Checks where the dataset is created by comparing the center prefix on the dataset ID, if it is a remote ID processing stops. +3. For each stable ID in the dataset the following is performed: + 1. The archive file path and file size is fetched from the database. + 2. The file size on disk is requested from the storage system. + 3. A file reader is created for the archive storage file, and a file writer is created for the sync storage file. + 1. The header is read from the database. + 2. The header is decrypted. + 3. The header is reencrypted with the destinations public key. + 4. The header is written to the sync file writer. + 4. The file data is copied from the archive file reader to the sync file writer. +4. Once all files have been copied to the destination a JSON struct is created acording to `file-sync` schema. +5. A POST message is sent to the remote api host with the JSON data. +6. The message is Ack'ed. + +## Communication + +- Sync reads messages from one rabbitmq stream (`mapping_stream`) +- Sync reads file information and headers from the database and can not be started without a database connection. +- Sync re-encrypts the header with the receiving end's public key. +- Sync reads data from archive storage and writes data to sync destination storage with the re-encrypted headers attached. diff --git a/sda/cmd/sync/sync_test.go b/sda/cmd/sync/sync_test.go new file mode 100644 index 000000000..cc7efd046 --- /dev/null +++ b/sda/cmd/sync/sync_test.go @@ -0,0 +1,225 @@ +package main + +import ( + "context" + "crypto/sha256" + "database/sql" + "fmt" + "net/http" + "net/http/httptest" + "os" + "path" + "runtime" + "strconv" + "testing" + "time" + + "github.com/google/uuid" + "github.com/neicnordic/sensitive-data-archive/internal/config" + "github.com/neicnordic/sensitive-data-archive/internal/database" + "github.com/ory/dockertest/v3" + "github.com/ory/dockertest/v3/docker" + log "github.com/sirupsen/logrus" + "github.com/spf13/viper" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" +) + +var dbPort int + +type SyncTest struct { + suite.Suite +} + +func TestSyncTestSuite(t *testing.T) { + suite.Run(t, new(SyncTest)) +} + +func TestMain(m *testing.M) { + if _, err := os.Stat("/.dockerenv"); err == nil { + m.Run() + } + _, b, _, _ := runtime.Caller(0) + rootDir := path.Join(path.Dir(b), "../../../") + + // uses a sensible default on windows (tcp/http) and linux/osx (socket) + pool, err := dockertest.NewPool("") + if err != nil { + log.Fatalf("Could not construct pool: %s", err) + } + + // uses pool to try to connect to Docker + err = pool.Client.Ping() + if err != nil { + log.Fatalf("Could not connect to Docker: %s", err) + } + + // pulls an image, creates a container based on it and runs it + postgres, err := pool.RunWithOptions(&dockertest.RunOptions{ + Repository: "postgres", + Tag: "15.2-alpine3.17", + Env: []string{ + "POSTGRES_PASSWORD=rootpasswd", + "POSTGRES_DB=sda", + }, + Mounts: []string{ + fmt.Sprintf("%s/postgresql/initdb.d:/docker-entrypoint-initdb.d", rootDir), + }, + }, func(config *docker.HostConfig) { + // set AutoRemove to true so that stopped container goes away by itself + config.AutoRemove = true + config.RestartPolicy = docker.RestartPolicy{ + Name: "no", + } + }) + if err != nil { + log.Fatalf("Could not start resource: %s", err) + } + + dbHostAndPort := postgres.GetHostPort("5432/tcp") + dbPort, _ = strconv.Atoi(postgres.GetPort("5432/tcp")) + databaseURL := fmt.Sprintf("postgres://postgres:rootpasswd@%s/sda?sslmode=disable", dbHostAndPort) + + pool.MaxWait = 120 * time.Second + if err = pool.Retry(func() error { + db, err := sql.Open("postgres", databaseURL) + if err != nil { + log.Println(err) + + return err + } + + query := "SELECT MAX(version) FROM sda.dbschema_version" + var dbVersion int + + return db.QueryRow(query).Scan(&dbVersion) + }); err != nil { + log.Fatalf("Could not connect to postgres: %s", err) + } + + log.Println("starting tests") + _ = m.Run() + + log.Println("tests completed") + if err := pool.Purge(postgres); err != nil { + log.Fatalf("Could not purge resource: %s", err) + } + pvo := docker.PruneVolumesOptions{Filters: make(map[string][]string), Context: context.Background()} + if _, err := pool.Client.PruneVolumes(pvo); err != nil { + log.Fatalf("could not prune docker volumes: %s", err.Error()) + } +} + +func (suite *SyncTest) SetupTest() { + viper.Set("log.level", "debug") + viper.Set("archive.type", "posix") + viper.Set("archive.location", "../../dev_utils") + viper.Set("sync.destination.type", "posix") + viper.Set("sync.destination.location", "../../dev_utils") + + viper.Set("broker.host", "localhost") + viper.Set("broker.port", 123) + viper.Set("broker.user", "guest") + viper.Set("broker.password", "guest") + viper.Set("broker.queue", "test") + viper.Set("db.host", "localhost") + viper.Set("db.port", dbPort) + viper.Set("db.user", "postgres") + viper.Set("db.password", "rootpasswd") + viper.Set("db.database", "sda") + viper.Set("db.sslmode", "disable") + viper.Set("sync.centerPrefix", "prefix") + viper.Set("sync.remote.host", "http://remote.example") + viper.Set("sync.remote.user", "user") + viper.Set("sync.remote.password", "pass") + + key := "-----BEGIN CRYPT4GH ENCRYPTED PRIVATE KEY-----\nYzRnaC12MQAGc2NyeXB0ABQAAAAAEna8op+BzhTVrqtO5Rx7OgARY2hhY2hhMjBfcG9seTEzMDUAPMx2Gbtxdva0M2B0tb205DJT9RzZmvy/9ZQGDx9zjlObj11JCqg57z60F0KhJW+j/fzWL57leTEcIffRTA==\n-----END CRYPT4GH ENCRYPTED PRIVATE KEY-----" + keyPath, _ := os.MkdirTemp("", "key") + err := os.WriteFile(keyPath+"/c4gh.key", []byte(key), 0600) + assert.NoError(suite.T(), err) + + viper.Set("c4gh.filepath", keyPath+"/c4gh.key") + viper.Set("c4gh.passphrase", "test") + + pubKey := "-----BEGIN CRYPT4GH PUBLIC KEY-----\nuQO46R56f/Jx0YJjBAkZa2J6n72r6HW/JPMS4tfepBs=\n-----END CRYPT4GH PUBLIC KEY-----" + err = os.WriteFile(keyPath+"/c4gh.pub", []byte(pubKey), 0600) + assert.NoError(suite.T(), err) + viper.Set("c4gh.syncPubKeyPath", keyPath+"/c4gh.pub") + + defer os.RemoveAll(keyPath) +} + +func (suite *SyncTest) TestBuildSyncDatasetJSON() { + suite.SetupTest() + Conf, err := config.NewConfig("sync") + assert.NoError(suite.T(), err) + + db, err = database.NewSDAdb(Conf.Database) + assert.NoError(suite.T(), err) + + fileID, err := db.RegisterFile("dummy.user/test/file1.c4gh", "dummy.user") + assert.NoError(suite.T(), err, "failed to register file in database") + err = db.SetAccessionID("ed6af454-d910-49e3-8cda-488a6f246e67", fileID) + assert.NoError(suite.T(), err) + + checksum := fmt.Sprintf("%x", sha256.New().Sum(nil)) + fileInfo := database.FileInfo{Checksum: fmt.Sprintf("%x", sha256.New().Sum(nil)), Size: 1234, Path: "dummy.user/test/file1.c4gh", DecryptedChecksum: checksum, DecryptedSize: 999} + corrID := uuid.New().String() + + err = db.SetArchived(fileInfo, fileID, corrID) + assert.NoError(suite.T(), err, "failed to mark file as Archived") + err = db.SetVerified(fileInfo, fileID, corrID) + assert.NoError(suite.T(), err, "failed to mark file as Verified") + + accessions := []string{"ed6af454-d910-49e3-8cda-488a6f246e67"} + assert.NoError(suite.T(), db.MapFilesToDataset("cd532362-e06e-4461-8490-b9ce64b8d9e7", accessions), "failed to map file to dataset") + + m := []byte(`{"type":"mapping", "dataset_id": "cd532362-e06e-4461-8490-b9ce64b8d9e7", "accession_ids": ["ed6af454-d910-49e3-8cda-488a6f246e67"]}`) + jsonData, err := buildSyncDatasetJSON(m) + assert.NoError(suite.T(), err) + dataset := []byte(`{"dataset_id":"cd532362-e06e-4461-8490-b9ce64b8d9e7","dataset_files":[{"filepath":"dummy.user/test/file1.c4gh","file_id":"ed6af454-d910-49e3-8cda-488a6f246e67","sha256":"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"}],"user":"dummy.user"}`) + assert.Equal(suite.T(), string(dataset), string(jsonData)) +} + +func (suite *SyncTest) TestCreateHostURL() { + conf = &config.Config{} + conf.Sync = config.Sync{ + RemoteHost: "http://localhost", + RemotePort: 443, + } + + s, err := createHostURL(conf.Sync.RemoteHost, conf.Sync.RemotePort) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "http://localhost:443/dataset", s) +} + +func (suite *SyncTest) TestSendPOST() { + r := http.NewServeMux() + r.HandleFunc("/dataset", func(w http.ResponseWriter, r *http.Request) { + username, _, ok := r.BasicAuth() + if ok && username == "foo" { + w.WriteHeader(http.StatusUnauthorized) + } + + w.WriteHeader(http.StatusOK) + }) + ts := httptest.NewServer(r) + defer ts.Close() + + conf = &config.Config{} + conf.Sync = config.Sync{ + RemoteHost: ts.URL, + RemoteUser: "test", + RemotePassword: "test", + } + syncJSON := []byte(`{"user":"test.user@example.com", "dataset_id": "cd532362-e06e-4460-8490-b9ce64b8d9e7", "dataset_files": [{"filepath": "inbox/user/file1.c4gh","file_id": "5fe7b660-afea-4c3a-88a9-3daabf055ebb", "sha256": "82E4e60e7beb3db2e06A00a079788F7d71f75b61a4b75f28c4c942703dabb6d6"}, {"filepath": "inbox/user/file2.c4gh","file_id": "ed6af454-d910-49e3-8cda-488a6f246e76", "sha256": "c967d96e56dec0f0cfee8f661846238b7f15771796ee1c345cae73cd812acc2b"}]}`) + err := sendPOST(syncJSON) + assert.NoError(suite.T(), err) + + conf.Sync = config.Sync{ + RemoteHost: ts.URL, + RemoteUser: "foo", + RemotePassword: "bar", + } + assert.EqualError(suite.T(), sendPOST(syncJSON), "401 Unauthorized") +} diff --git a/sda/cmd/syncapi/syncapi.go b/sda/cmd/syncapi/syncapi.go new file mode 100644 index 000000000..3ceaead07 --- /dev/null +++ b/sda/cmd/syncapi/syncapi.go @@ -0,0 +1,272 @@ +package main + +import ( + "crypto/sha256" + "crypto/subtle" + "crypto/tls" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "github.com/google/uuid" + "github.com/gorilla/mux" + "github.com/neicnordic/sensitive-data-archive/internal/broker" + "github.com/neicnordic/sensitive-data-archive/internal/config" + "github.com/neicnordic/sensitive-data-archive/internal/schema" + + log "github.com/sirupsen/logrus" +) + +var Conf *config.Config +var err error + +type syncDataset struct { + DatasetID string `json:"dataset_id"` + DatasetFiles []datasetFiles `json:"dataset_files"` + User string `json:"user"` +} + +type datasetFiles struct { + FilePath string `json:"filepath"` + FileID string `json:"file_id"` + ShaSum string `json:"sha256"` +} + +func main() { + Conf, err = config.NewConfig("sync-api") + if err != nil { + log.Fatal(err) + } + Conf.API.MQ, err = broker.NewMQ(Conf.Broker) + if err != nil { + log.Fatal(err) + } + + sigc := make(chan os.Signal, 5) + signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + go func() { + <-sigc + shutdown() + os.Exit(0) + }() + + srv := setup(Conf) + + if Conf.API.ServerCert != "" && Conf.API.ServerKey != "" { + log.Infof("Web server is ready to receive connections at https://%s:%d", Conf.API.Host, Conf.API.Port) + if err := srv.ListenAndServeTLS(Conf.API.ServerCert, Conf.API.ServerKey); err != nil { + shutdown() + log.Fatalln(err) + } + } else { + log.Infof("Web server is ready to receive connections at http://%s:%d", Conf.API.Host, Conf.API.Port) + if err := srv.ListenAndServe(); err != nil { + shutdown() + log.Fatalln(err) + } + } +} + +func setup(config *config.Config) *http.Server { + r := mux.NewRouter().SkipClean(true) + + r.HandleFunc("/ready", readinessResponse).Methods("GET") + r.HandleFunc("/dataset", basicAuth(http.HandlerFunc(dataset))).Methods("POST") + r.HandleFunc("/metadata", basicAuth(http.HandlerFunc(metadata))).Methods("POST") + + cfg := &tls.Config{ + MinVersion: tls.VersionTLS12, + CurvePreferences: []tls.CurveID{tls.CurveP521, tls.CurveP384, tls.CurveP256}, + PreferServerCipherSuites: true, + CipherSuites: []uint16{ + tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, + }, + } + + srv := &http.Server{ + Addr: config.API.Host + ":" + fmt.Sprint(config.API.Port), + Handler: r, + TLSConfig: cfg, + TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)), + ReadTimeout: 5 * time.Second, + WriteTimeout: 5 * time.Second, + IdleTimeout: 30 * time.Second, + ReadHeaderTimeout: 3 * time.Second, + } + + return srv +} + +func shutdown() { + defer Conf.API.MQ.Channel.Close() + defer Conf.API.MQ.Connection.Close() +} + +func readinessResponse(w http.ResponseWriter, _ *http.Request) { + statusCocde := http.StatusOK + + if Conf.API.MQ.Connection.IsClosed() { + statusCocde = http.StatusServiceUnavailable + newConn, err := broker.NewMQ(Conf.Broker) + if err != nil { + log.Errorf("failed to reconnect to MQ, reason: %v", err) + } else { + Conf.API.MQ = newConn + } + } + + if Conf.API.MQ.Channel.IsClosed() { + statusCocde = http.StatusServiceUnavailable + Conf.API.MQ.Connection.Close() + newConn, err := broker.NewMQ(Conf.Broker) + if err != nil { + log.Errorf("failed to reconnect to MQ, reason: %v", err) + } else { + Conf.API.MQ = newConn + } + } + + w.WriteHeader(statusCocde) +} + +func dataset(w http.ResponseWriter, r *http.Request) { + b, err := io.ReadAll(r.Body) + if err != nil { + respondWithError(w, http.StatusBadRequest, "failed to read request body") + + return + } + defer r.Body.Close() + + if err := schema.ValidateJSON(fmt.Sprintf("%s/../bigpicture/file-sync.json", Conf.Broker.SchemasPath), b); err != nil { + respondWithError(w, http.StatusBadRequest, fmt.Sprintf("eror on JSON validation: %s", err.Error())) + + return + } + + if err := parseDatasetMessage(b); err != nil { + w.WriteHeader(http.StatusInternalServerError) + } + + w.WriteHeader(http.StatusOK) +} + +// parseDatasetMessage parses the JSON blob and sends the relevant messages +func parseDatasetMessage(msg []byte) error { + log.Debugf("incoming blob %s", msg) + blob := syncDataset{} + _ = json.Unmarshal(msg, &blob) + + var accessionIDs []string + for _, files := range blob.DatasetFiles { + ingest := schema.IngestionTrigger{ + Type: "ingest", + User: blob.User, + FilePath: files.FilePath, + } + ingestMsg, err := json.Marshal(ingest) + if err != nil { + return fmt.Errorf("failed to marshal json messge: Reason %v", err) + } + corrID := uuid.New().String() + if err := Conf.API.MQ.SendMessage(corrID, Conf.Broker.Exchange, Conf.SyncAPI.IngestRouting, ingestMsg); err != nil { + return fmt.Errorf("failed to send ingest messge: Reason %v", err) + } + + accessionIDs = append(accessionIDs, files.FileID) + finalize := schema.IngestionAccession{ + Type: "accession", + User: blob.User, + FilePath: files.FilePath, + AccessionID: files.FileID, + DecryptedChecksums: []schema.Checksums{{Type: "sha256", Value: files.ShaSum}}, + } + finalizeMsg, err := json.Marshal(finalize) + if err != nil { + return fmt.Errorf("failed to marshal json messge: Reason %v", err) + } + + if err := Conf.API.MQ.SendMessage(corrID, Conf.Broker.Exchange, Conf.SyncAPI.AccessionRouting, finalizeMsg); err != nil { + return fmt.Errorf("failed to send mapping messge: Reason %v", err) + } + } + + mappings := schema.DatasetMapping{ + Type: "mapping", + DatasetID: blob.DatasetID, + AccessionIDs: accessionIDs, + } + mappingMsg, err := json.Marshal(mappings) + if err != nil { + return fmt.Errorf("failed to marshal json messge: Reason %v", err) + } + + if err := Conf.API.MQ.SendMessage(fmt.Sprintf("%v", time.Now().Unix()), Conf.Broker.Exchange, Conf.SyncAPI.MappingRouting, mappingMsg); err != nil { + return fmt.Errorf("failed to send mapping messge: Reason %v", err) + } + + return nil +} + +func respondWithError(w http.ResponseWriter, code int, message string) { + respondWithJSON(w, code, map[string]string{"error": message}) +} + +func respondWithJSON(w http.ResponseWriter, code int, payload interface{}) { + log.Infoln(payload) + response, _ := json.Marshal(payload) + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + _, err = w.Write(response) + if err != nil { + log.Errorf("failed to write HTTP response, reason: %v", err) + } +} + +func metadata(w http.ResponseWriter, r *http.Request) { + b, err := io.ReadAll(r.Body) + if err != nil { + respondWithError(w, http.StatusBadRequest, "failed to read request body") + + return + } + defer r.Body.Close() + + if err := schema.ValidateJSON(fmt.Sprintf("%s/bigpicture/metadata-sync.json", Conf.Broker.SchemasPath), b); err != nil { + respondWithError(w, http.StatusBadRequest, err.Error()) + + return + } + + w.WriteHeader(http.StatusOK) +} + +func basicAuth(auth http.HandlerFunc) http.HandlerFunc { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + username, password, ok := r.BasicAuth() + if ok { + usernameHash := sha256.Sum256([]byte(username)) + passwordHash := sha256.Sum256([]byte(password)) + expectedUsernameHash := sha256.Sum256([]byte(Conf.SyncAPI.APIUser)) + expectedPasswordHash := sha256.Sum256([]byte(Conf.SyncAPI.APIPassword)) + + usernameMatch := (subtle.ConstantTimeCompare(usernameHash[:], expectedUsernameHash[:]) == 1) + passwordMatch := (subtle.ConstantTimeCompare(passwordHash[:], expectedPasswordHash[:]) == 1) + + if usernameMatch && passwordMatch { + auth.ServeHTTP(w, r) + + return + } + } + + w.Header().Set("WWW-Authenticate", `Basic realm="restricted", charset="UTF-8"`) + http.Error(w, "Unauthorized", http.StatusUnauthorized) + }) +} diff --git a/sda/cmd/syncapi/syncapi.md b/sda/cmd/syncapi/syncapi.md new file mode 100644 index 000000000..d650e7612 --- /dev/null +++ b/sda/cmd/syncapi/syncapi.md @@ -0,0 +1,69 @@ +# sync-api + +The sync-api service is used in the [Bigpicture](https://bigpicture.eu/) project. + +## Configuration + +There are a number of options that can be set for the sync service. +These settings can be set by mounting a yaml-file at `/config.yaml` with settings. + +ex. + +```yaml +log: + level: "debug" + format: "json" +``` + +They may also be set using environment variables like: + +```bash +export LOG_LEVEL="debug" +export LOG_FORMAT="json" +``` + +### Service settings + +- `SYNC_API_PASSWORD`: password for the API user +- `SYNC_API_USER`: User that will be allowed to send POST requests to the API + +### RabbitMQ broker settings + +These settings control how sync connects to the RabbitMQ message broker. + +- `BROKER_HOST`: hostname of the rabbitmq server +- `BROKER_PORT`: rabbitmq broker port (commonly `5671` with TLS and `5672` without) +- `BROKER_EXCHANGE`: exchange to send messages to +- `BROKER_USER`: username to connect to rabbitmq +- `BROKER_PASSWORD`: password to connect to rabbitmq +- `BROKER_PREFETCHCOUNT`: Number of messages to pull from the message server at the time (default to 2) + +The default routing keys for sending ingestion, accession and maping messages can be overridden by setting the following values: + +- `SYNC_API_ACCESSIONROUTING` +- `SYNC_API_INGESTROUTING` +- `SYNC_API_MAPPINGROUTING` + +### Logging settings + +- `LOG_FORMAT` can be set to “json” to get logs in json format. All other values result in text logging +- `LOG_LEVEL` can be set to one of the following, in increasing order of severity: + - `trace` + - `debug` + - `info` + - `warn` (or `warning`) + - `error` + - `fatal` + - `panic` + +## Service Description + +The sync service facilitates replication of data and metadata between the nodes in the consortium. + +When enabled the service will perform the following tasks: + +1. Upon recieving a POST request with JSON data to the `/dataset` route. + 1. Parse the JSON blob and validate it against the `file-sync` schema. + 2. Build and send messages to start ingestion of files. + 3. Build and send messages to assign stableIDs to files. + 4. Build and send messages to map files to a dataset. diff --git a/sda/cmd/syncapi/syncapi_test.go b/sda/cmd/syncapi/syncapi_test.go new file mode 100644 index 000000000..c80134f92 --- /dev/null +++ b/sda/cmd/syncapi/syncapi_test.go @@ -0,0 +1,278 @@ +package main + +import ( + "bytes" + "context" + "net/http" + "net/http/httptest" + "os" + "strconv" + "testing" + "time" + + "github.com/gorilla/mux" + "github.com/neicnordic/sensitive-data-archive/internal/broker" + "github.com/neicnordic/sensitive-data-archive/internal/config" + "github.com/ory/dockertest/v3" + "github.com/ory/dockertest/v3/docker" + "github.com/spf13/viper" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + + log "github.com/sirupsen/logrus" +) + +var mqPort int + +type SyncAPITest struct { + suite.Suite +} + +func TestSyncAPITestSuite(t *testing.T) { + suite.Run(t, new(SyncAPITest)) +} + +func TestMain(m *testing.M) { + if _, err := os.Stat("/.dockerenv"); err == nil { + m.Run() + } + + // uses a sensible default on windows (tcp/http) and linux/osx (socket) + pool, err := dockertest.NewPool("") + if err != nil { + log.Fatalf("Could not construct pool: %s", err) + } + + // uses pool to try to connect to Docker + err = pool.Client.Ping() + if err != nil { + log.Fatalf("Could not connect to Docker: %s", err) + } + + // pulls an image, creates a container based on it and runs it + rabbitmq, err := pool.RunWithOptions(&dockertest.RunOptions{ + Repository: "rabbitmq", + Tag: "3-management-alpine", + }, func(config *docker.HostConfig) { + // set AutoRemove to true so that stopped container goes away by itself + config.AutoRemove = true + config.RestartPolicy = docker.RestartPolicy{ + Name: "no", + } + }) + if err != nil { + log.Fatalf("Could not start resource: %s", err) + } + + mqPort, _ = strconv.Atoi(rabbitmq.GetPort("5672/tcp")) + mqHostAndPort := rabbitmq.GetHostPort("15672/tcp") + + client := http.Client{Timeout: 5 * time.Second} + req, err := http.NewRequest(http.MethodGet, "http://"+mqHostAndPort+"/api/users", http.NoBody) + if err != nil { + log.Fatal(err) + } + req.SetBasicAuth("guest", "guest") + + // exponential backoff-retry, because the application in the container might not be ready to accept connections yet + if err := pool.Retry(func() error { + res, err := client.Do(req) + if err != nil { + return err + } + res.Body.Close() + + return nil + }); err != nil { + if err := pool.Purge(rabbitmq); err != nil { + log.Fatalf("Could not purge resource: %s", err) + } + log.Fatalf("Could not connect to rabbitmq: %s", err) + } + + log.Println("starting tests") + _ = m.Run() + + log.Println("tests completed") + if err := pool.Purge(rabbitmq); err != nil { + log.Fatalf("Could not purge resource: %s", err) + } + pvo := docker.PruneVolumesOptions{Filters: make(map[string][]string), Context: context.Background()} + if _, err := pool.Client.PruneVolumes(pvo); err != nil { + log.Fatalf("could not prune docker volumes: %s", err.Error()) + } +} + +func (suite *SyncAPITest) SetupTest() { + viper.Set("log.level", "debug") + viper.Set("log.format", "json") + + viper.Set("bpPrefix", "PFX") + + viper.Set("broker.host", "127.0.0.1") + viper.Set("broker.port", mqPort) + viper.Set("broker.user", "guest") + viper.Set("broker.password", "guest") + viper.Set("broker.queue", "mappings") + viper.Set("broker.exchange", "amq.direct") + viper.Set("broker.vhost", "/") + + viper.Set("schema.type", "isolated") + + viper.Set("sync.api.user", "dummy") + viper.Set("sync.api.password", "admin") +} + +func (suite *SyncAPITest) TestSetup() { + suite.SetupTest() + + conf, err := config.NewConfig("sync-api") + assert.NoError(suite.T(), err, "Failed to setup config") + assert.Equal(suite.T(), mqPort, conf.Broker.Port) + assert.Equal(suite.T(), mqPort, viper.GetInt("broker.port")) + + server := setup(conf) + assert.Equal(suite.T(), "0.0.0.0:8080", server.Addr) +} + +func (suite *SyncAPITest) TestShutdown() { + suite.SetupTest() + Conf, err = config.NewConfig("sync-api") + assert.NoError(suite.T(), err) + + Conf.API.MQ, err = broker.NewMQ(Conf.Broker) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "127.0.0.1", Conf.API.MQ.Conf.Host) + + // make sure all conections are alive + assert.Equal(suite.T(), false, Conf.API.MQ.Channel.IsClosed()) + assert.Equal(suite.T(), false, Conf.API.MQ.Connection.IsClosed()) + + shutdown() + assert.Equal(suite.T(), true, Conf.API.MQ.Channel.IsClosed()) + assert.Equal(suite.T(), true, Conf.API.MQ.Connection.IsClosed()) +} + +func (suite *SyncAPITest) TestReadinessResponse() { + suite.SetupTest() + Conf, err = config.NewConfig("sync-api") + assert.NoError(suite.T(), err) + + Conf.API.MQ, err = broker.NewMQ(Conf.Broker) + assert.NoError(suite.T(), err) + + r := mux.NewRouter() + r.HandleFunc("/ready", readinessResponse) + ts := httptest.NewServer(r) + defer ts.Close() + + res, err := http.Get(ts.URL + "/ready") + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), http.StatusOK, res.StatusCode) + defer res.Body.Close() + + // close the connection to force a reconneciton + Conf.API.MQ.Connection.Close() + res, err = http.Get(ts.URL + "/ready") + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), http.StatusServiceUnavailable, res.StatusCode) + defer res.Body.Close() + + // reconnect should be fast so now this should pass + res, err = http.Get(ts.URL + "/ready") + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), http.StatusOK, res.StatusCode) + defer res.Body.Close() + + // close the channel to force a reconneciton + Conf.API.MQ.Channel.Close() + res, err = http.Get(ts.URL + "/ready") + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), http.StatusServiceUnavailable, res.StatusCode) + defer res.Body.Close() + + // reconnect should be fast so now this should pass + res, err = http.Get(ts.URL + "/ready") + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), http.StatusOK, res.StatusCode) + defer res.Body.Close() +} + +func (suite *SyncAPITest) TestDatasetRoute() { + suite.SetupTest() + Conf, err = config.NewConfig("sync-api") + assert.NoError(suite.T(), err) + + Conf.API.MQ, err = broker.NewMQ(Conf.Broker) + assert.NoError(suite.T(), err) + + Conf.Broker.SchemasPath = "../../schemas/isolated/" + + r := mux.NewRouter() + r.HandleFunc("/dataset", dataset) + ts := httptest.NewServer(r) + defer ts.Close() + + goodJSON := []byte(`{"user": "test.user@example.com", "dataset_id": "cd532362-e06e-4460-8490-b9ce64b8d9e6", "dataset_files": [{"filepath": "inbox/user/file-1.c4gh","file_id": "5fe7b660-afea-4c3a-88a9-3daabf055ebb", "sha256": "82E4e60e7beb3db2e06A00a079788F7d71f75b61a4b75f28c4c942703dabb6d6"}, {"filepath": "inbox/user/file2.c4gh","file_id": "ed6af454-d910-49e3-8cda-488a6f246e76", "sha256": "c967d96e56dec0f0cfee8f661846238b7f15771796ee1c345cae73cd812acc2b"}]}`) + good, err := http.Post(ts.URL+"/dataset", "application/json", bytes.NewBuffer(goodJSON)) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), http.StatusOK, good.StatusCode) + defer good.Body.Close() + + badJSON := []byte(`{"dataset_id": "cd532362-e06e-4460-8490-b9ce64b8d9e7", "dataset_files": []}`) + bad, err := http.Post(ts.URL+"/dataset", "application/json", bytes.NewBuffer(badJSON)) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), http.StatusBadRequest, bad.StatusCode) + defer bad.Body.Close() +} + +func (suite *SyncAPITest) TestMetadataRoute() { + Conf = &config.Config{} + Conf.Broker.SchemasPath = "../../schemas" + + r := mux.NewRouter() + r.HandleFunc("/metadata", metadata) + ts := httptest.NewServer(r) + defer ts.Close() + + goodJSON := []byte(`{"dataset_id": "cd532362-e06e-4460-8490-b9ce64b8d9e7", "metadata": {"dummy":"data"}}`) + good, err := http.Post(ts.URL+"/metadata", "application/json", bytes.NewBuffer(goodJSON)) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), http.StatusOK, good.StatusCode) + defer good.Body.Close() + + badJSON := []byte(`{"dataset_id": "phail", "metadata": {}}`) + bad, err := http.Post(ts.URL+"/metadata", "application/json", bytes.NewBuffer(badJSON)) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), http.StatusBadRequest, bad.StatusCode) + defer bad.Body.Close() +} + +func (suite *SyncAPITest) TestBasicAuth() { + Conf = &config.Config{} + Conf.Broker.SchemasPath = "../../schemas" + Conf.SyncAPI = config.SyncAPIConf{ + APIUser: "dummy", + APIPassword: "test", + } + + r := mux.NewRouter() + r.HandleFunc("/metadata", basicAuth(metadata)) + ts := httptest.NewServer(r) + defer ts.Close() + + goodJSON := []byte(`{"dataset_id": "cd532362-e06e-4460-8490-b9ce64b8d9e7", "metadata": {"dummy":"data"}}`) + req, err := http.NewRequest("POST", ts.URL+"/metadata", bytes.NewBuffer(goodJSON)) + assert.NoError(suite.T(), err) + req.SetBasicAuth(Conf.SyncAPI.APIUser, Conf.SyncAPI.APIPassword) + good, err := ts.Client().Do(req) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), http.StatusOK, good.StatusCode) + defer good.Body.Close() + + req.SetBasicAuth(Conf.SyncAPI.APIUser, "wrongpass") + bad, err := ts.Client().Do(req) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), http.StatusUnauthorized, bad.StatusCode) + defer bad.Body.Close() +} diff --git a/sda/go.mod b/sda/go.mod index 7efa8d4ef..064747b59 100644 --- a/sda/go.mod +++ b/sda/go.mod @@ -43,6 +43,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect + github.com/gorilla/mux v1.8.1 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/imdario/mergo v0.3.12 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect diff --git a/sda/go.sum b/sda/go.sum index fa409c654..8d84f3085 100644 --- a/sda/go.sum +++ b/sda/go.sum @@ -167,6 +167,8 @@ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= +github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= diff --git a/sda/internal/config/config.go b/sda/internal/config/config.go index 96f3b50cd..91100e3d5 100644 --- a/sda/internal/config/config.go +++ b/sda/internal/config/config.go @@ -9,11 +9,10 @@ import ( "strings" "time" + "github.com/neicnordic/crypt4gh/keys" "github.com/neicnordic/sensitive-data-archive/internal/broker" "github.com/neicnordic/sensitive-data-archive/internal/database" "github.com/neicnordic/sensitive-data-archive/internal/storage" - - "github.com/neicnordic/crypt4gh/keys" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/spf13/viper" @@ -46,6 +45,25 @@ type Config struct { API APIConf Notify SMTPConf Orchestrator OrchestratorConf + Sync Sync + SyncAPI SyncAPIConf +} + +type Sync struct { + CenterPrefix string + Destination storage.Conf + RemoteHost string + RemotePassword string + RemotePort int + RemoteUser string +} + +type SyncAPIConf struct { + APIPassword string + APIUser string + AccessionRouting string `default:"accession"` + IngestRouting string `default:"ingest"` + MappingRouting string `default:"mappings"` } type APIConf struct { @@ -257,6 +275,56 @@ func NewConfig(app string) (*Config, error) { "inbox.bucket", } viper.Set("inbox.type", S3) + case "sync": + requiredConfVars = []string{ + "broker.host", + "broker.port", + "broker.user", + "broker.password", + "broker.queue", + "c4gh.filepath", + "c4gh.passphrase", + "c4gh.syncPubKeyPath", + "db.host", + "db.port", + "db.user", + "db.password", + "db.database", + "sync.centerPrefix", + "sync.remote.host", + "sync.remote.user", + "sync.remote.password", + } + + switch viper.GetString("archive.type") { + case S3: + requiredConfVars = append(requiredConfVars, []string{"archive.url", "archive.accesskey", "archive.secretkey", "archive.bucket"}...) + case POSIX: + requiredConfVars = append(requiredConfVars, []string{"archive.location"}...) + default: + return nil, fmt.Errorf("archive.type not set") + } + + switch viper.GetString("sync.destination.type") { + case S3: + requiredConfVars = append(requiredConfVars, []string{"sync.destination.url", "sync.destination.accesskey", "sync.destination.secretkey", "sync.destination.bucket"}...) + case POSIX: + requiredConfVars = append(requiredConfVars, []string{"sync.destination.location"}...) + case SFTP: + requiredConfVars = append(requiredConfVars, []string{"sync.destination.sftp.host", "sync.destination.sftp.port", "sync.destination.sftp.userName", "sync.destination.sftp.pemKeyPath", "sync.destination.sftp.pemKeyPass"}...) + default: + return nil, fmt.Errorf("sync.destination.type not set") + } + case "sync-api": + requiredConfVars = []string{ + "broker.exchange", + "broker.host", + "broker.port", + "broker.user", + "broker.password", + "sync.api.user", + "sync.api.password", + } case "verify": requiredConfVars = []string{ "broker.host", @@ -410,6 +478,29 @@ func NewConfig(app string) (*Config, error) { if err != nil { return nil, err } + case "sync": + if err := c.configBroker(); err != nil { + return nil, err + } + + if err := c.configDatabase(); err != nil { + return nil, err + } + + c.configArchive() + c.configSync() + c.configSchemas() + case "sync-api": + if err := c.configBroker(); err != nil { + return nil, err + } + + if err := c.configAPI(); err != nil { + return nil, err + } + + c.configSyncAPI() + c.configSchemas() case "verify": c.configArchive() @@ -722,6 +813,47 @@ func (c *Config) configSMTP() { c.Notify.FromAddr = viper.GetString("smtp.from") } +// configSync provides configuration for the sync destination storage +func (c *Config) configSync() { + switch viper.GetString("sync.destination.type") { + case S3: + c.Sync.Destination.Type = S3 + c.Sync.Destination.S3 = configS3Storage("sync.destination") + case SFTP: + c.Sync.Destination.Type = SFTP + c.Sync.Destination.SFTP = configSFTP("sync.destination") + case POSIX: + c.Sync.Destination.Type = POSIX + c.Sync.Destination.Posix.Location = viper.GetString("sync.destination.location") + } + + c.Sync.RemoteHost = viper.GetString("sync.remote.host") + if viper.IsSet("sync.remote.port") { + c.Sync.RemotePort = viper.GetInt("sync.remote.port") + } + c.Sync.RemotePassword = viper.GetString("sync.remote.password") + c.Sync.RemoteUser = viper.GetString("sync.remote.user") + c.Sync.CenterPrefix = viper.GetString("sync.centerPrefix") +} + +// configSync provides configuration for the outgoing sync settings +func (c *Config) configSyncAPI() { + c.SyncAPI = SyncAPIConf{} + c.SyncAPI.APIPassword = viper.GetString("sync.api.password") + c.SyncAPI.APIUser = viper.GetString("sync.api.user") + + if viper.IsSet("sync.api.AccessionRouting") { + c.SyncAPI.AccessionRouting = viper.GetString("sync.api.AccessionRouting") + } + if viper.IsSet("sync.api.IngestRouting") { + c.SyncAPI.IngestRouting = viper.GetString("sync.api.IngestRouting") + } + if viper.IsSet("sync.api.MappingRouting") { + c.SyncAPI.MappingRouting = viper.GetString("sync.api.MappingRouting") + } + +} + // GetC4GHKey reads and decrypts and returns the c4gh key func GetC4GHKey() (*[32]byte, error) { keyPath := viper.GetString("c4gh.filepath") @@ -745,8 +877,7 @@ func GetC4GHKey() (*[32]byte, error) { // GetC4GHPublicKey reads the c4gh public key func GetC4GHPublicKey() (*[32]byte, error) { - keyPath := viper.GetString("c4gh.backupPubKey") - + keyPath := viper.GetString("c4gh.syncPubKeyPath") // Make sure the key path and passphrase is valid keyFile, err := os.Open(keyPath) if err != nil { @@ -869,12 +1000,3 @@ func TLSConfigProxy(c *Config) (*tls.Config, error) { return cfg, nil } - -// CopyHeader reads the config and returns if the header will be copied -func CopyHeader() bool { - if viper.IsSet("backup.copyHeader") { - return viper.GetBool("backup.copyHeader") - } - - return false -} diff --git a/sda/internal/config/config_test.go b/sda/internal/config/config_test.go index 08c841f59..d599a711b 100644 --- a/sda/internal/config/config_test.go +++ b/sda/internal/config/config_test.go @@ -1,6 +1,7 @@ package config import ( + "encoding/base64" "errors" "fmt" "os" @@ -267,8 +268,99 @@ func (suite *ConfigTestSuite) TestNotifyConfiguration() { assert.NotNil(suite.T(), config) } -func (suite *ConfigTestSuite) TestCopyHeader() { - viper.Set("backup.copyHeader", "true") - cHeader := CopyHeader() - assert.Equal(suite.T(), cHeader, true, "The CopyHeader does not work") +func (suite *ConfigTestSuite) TestSyncConfig() { + suite.SetupTest() + // At this point we should fail because we lack configuration + config, err := NewConfig("backup") + assert.Error(suite.T(), err) + assert.Nil(suite.T(), config) + + viper.Set("archive.type", "posix") + viper.Set("archive.location", "test") + viper.Set("sync.centerPrefix", "prefix") + viper.Set("sync.destination.type", "posix") + viper.Set("sync.destination.location", "test") + viper.Set("sync.remote.host", "https://test.org") + viper.Set("sync.remote.user", "test") + viper.Set("sync.remote.password", "test") + viper.Set("c4gh.filepath", "/keys/key") + viper.Set("c4gh.passphrase", "pass") + viper.Set("c4gh.syncPubKeyPath", "/keys/recipient") + config, err = NewConfig("sync") + assert.NotNil(suite.T(), config) + assert.NoError(suite.T(), err) + assert.NotNil(suite.T(), config.Broker) + assert.Equal(suite.T(), "testhost", config.Broker.Host) + assert.Equal(suite.T(), 123, config.Broker.Port) + assert.Equal(suite.T(), "testuser", config.Broker.User) + assert.Equal(suite.T(), "testpassword", config.Broker.Password) + assert.Equal(suite.T(), "testqueue", config.Broker.Queue) + assert.NotNil(suite.T(), config.Database) + assert.Equal(suite.T(), "test", config.Database.Host) + assert.Equal(suite.T(), 123, config.Database.Port) + assert.Equal(suite.T(), "test", config.Database.User) + assert.Equal(suite.T(), "test", config.Database.Password) + assert.Equal(suite.T(), "test", config.Database.Database) + assert.NotNil(suite.T(), config.Archive) + assert.NotNil(suite.T(), config.Archive.Posix) + assert.Equal(suite.T(), "test", config.Archive.Posix.Location) + assert.NotNil(suite.T(), config.Sync) + assert.NotNil(suite.T(), config.Sync.Destination.Posix) + assert.Equal(suite.T(), "test", config.Sync.Destination.Posix.Location) +} +func (suite *ConfigTestSuite) TestGetC4GHPublicKey() { + pubKey := "-----BEGIN CRYPT4GH PUBLIC KEY-----\nuQO46R56f/Jx0YJjBAkZa2J6n72r6HW/JPMS4tfepBs=\n-----END CRYPT4GH PUBLIC KEY-----" + pubKeyPath, _ := os.MkdirTemp("", "pubkey") + err := os.WriteFile(pubKeyPath+"/c4gh.pub", []byte(pubKey), 0600) + assert.NoError(suite.T(), err) + + var kb [32]byte + k, _ := base64.StdEncoding.DecodeString("uQO46R56f/Jx0YJjBAkZa2J6n72r6HW/JPMS4tfepBs=") + copy(kb[:], k) + + viper.Set("c4gh.syncPubKeyPath", pubKeyPath+"/c4gh.pub") + pkBytes, err := GetC4GHPublicKey() + assert.NoError(suite.T(), err) + assert.NotNil(suite.T(), pkBytes) + assert.Equal(suite.T(), pkBytes, &kb, "GetC4GHPublicKey didn't return correct pubKey") + + defer os.RemoveAll(pubKeyPath) +} +func (suite *ConfigTestSuite) TestGetC4GHKey() { + key := "-----BEGIN CRYPT4GH ENCRYPTED PRIVATE KEY-----\nYzRnaC12MQAGc2NyeXB0ABQAAAAAEna8op+BzhTVrqtO5Rx7OgARY2hhY2hhMjBfcG9seTEzMDUAPMx2Gbtxdva0M2B0tb205DJT9RzZmvy/9ZQGDx9zjlObj11JCqg57z60F0KhJW+j/fzWL57leTEcIffRTA==\n-----END CRYPT4GH ENCRYPTED PRIVATE KEY-----" + keyPath, _ := os.MkdirTemp("", "key") + err := os.WriteFile(keyPath+"/c4gh.key", []byte(key), 0600) + assert.NoError(suite.T(), err) + + viper.Set("c4gh.filepath", keyPath+"/c4gh.key") + pkBytes, err := GetC4GHKey() + assert.EqualError(suite.T(), err, "chacha20poly1305: message authentication failed") + assert.Nil(suite.T(), pkBytes) + + viper.Set("c4gh.filepath", keyPath+"/c4gh.key") + viper.Set("c4gh.passphrase", "test") + pkBytes, err = GetC4GHKey() + assert.NoError(suite.T(), err) + assert.NotNil(suite.T(), pkBytes) + + defer os.RemoveAll(keyPath) +} + +func (suite *ConfigTestSuite) TestConfigSyncAPI() { + suite.SetupTest() + noConfig, err := NewConfig("sync-api") + assert.Error(suite.T(), err) + assert.Nil(suite.T(), noConfig) + + viper.Set("sync.api.user", "user") + viper.Set("sync.api.password", "password") + config, err := NewConfig("sync-api") + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "user", config.SyncAPI.APIUser) + assert.Equal(suite.T(), "password", config.SyncAPI.APIPassword) + + viper.Set("sync.api.AccessionRouting", "wrong") + config, err = NewConfig("sync-api") + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "wrong", config.SyncAPI.AccessionRouting) } diff --git a/sda/internal/database/database.go b/sda/internal/database/database.go index be16c19d4..478f1c349 100644 --- a/sda/internal/database/database.go +++ b/sda/internal/database/database.go @@ -42,6 +42,12 @@ type FileInfo struct { DecryptedSize int64 } +type SyncData struct { + User string + FilePath string + Checksum string +} + // SchemaName is the name of the remote database schema to query var SchemaName = "sda" diff --git a/sda/internal/database/db_functions.go b/sda/internal/database/db_functions.go index 5bb6ac8f5..540714267 100644 --- a/sda/internal/database/db_functions.go +++ b/sda/internal/database/db_functions.go @@ -509,3 +509,116 @@ func (dbs *SDAdb) getFileInfo(id string) (FileInfo, error) { return info, nil } + +// GetHeaderForStableID retrieves the file header by using stable id +func (dbs *SDAdb) GetHeaderForStableID(stableID string) ([]byte, error) { + dbs.checkAndReconnectIfNeeded() + const query = "SELECT header from sda.files WHERE stable_id = $1" + var hexString string + if err := dbs.DB.QueryRow(query, stableID).Scan(&hexString); err != nil { + return nil, err + } + + header, err := hex.DecodeString(hexString) + if err != nil { + return nil, err + } + + return header, nil +} + +// GetSyncData retrieves the file information needed to sync a dataset +func (dbs *SDAdb) GetSyncData(accessionID string) (SyncData, error) { + var ( + s SyncData + err error + ) + + for count := 1; count <= RetryTimes; count++ { + s, err = dbs.getSyncData(accessionID) + if err == nil { + break + } + time.Sleep(time.Duration(math.Pow(3, float64(count))) * time.Second) + } + + return s, err +} + +// getSyncData is the actual function performing work for GetSyncData +func (dbs *SDAdb) getSyncData(accessionID string) (SyncData, error) { + dbs.checkAndReconnectIfNeeded() + + const query = "SELECT submission_user, submission_file_path from sda.files WHERE stable_id = $1;" + var data SyncData + if err := dbs.DB.QueryRow(query, accessionID).Scan(&data.User, &data.FilePath); err != nil { + return SyncData{}, err + } + + const checksum = "SELECT checksum from sda.checksums WHERE source = 'UNENCRYPTED' and file_id = (SELECT id FROM sda.files WHERE stable_id = $1);" + if err := dbs.DB.QueryRow(checksum, accessionID).Scan(&data.Checksum); err != nil { + return SyncData{}, err + } + + return data, nil +} + +// CheckIfDatasetExists checks if a dataset already is registered +func (dbs *SDAdb) CheckIfDatasetExists(datasetID string) (bool, error) { + var ( + ds bool + err error + ) + + for count := 1; count <= RetryTimes; count++ { + ds, err = dbs.checkIfDatasetExists(datasetID) + if err == nil { + break + } + time.Sleep(time.Duration(math.Pow(3, float64(count))) * time.Second) + } + + return ds, err +} + +// getSyncData is the actual function performing work for GetSyncData +func (dbs *SDAdb) checkIfDatasetExists(datasetID string) (bool, error) { + dbs.checkAndReconnectIfNeeded() + + const query = "SELECT EXISTS(SELECT id from sda.datasets WHERE stable_id = $1);" + var yesNo bool + if err := dbs.DB.QueryRow(query, datasetID).Scan(&yesNo); err != nil { + return yesNo, err + } + + return yesNo, nil +} + +// GetInboxPath retrieves the submission_fie_path for a file with a given accessionID +func (dbs *SDAdb) GetArchivePath(stableID string) (string, error) { + var ( + err error + count int + archivePath string + ) + + for count == 0 || (err != nil && count < RetryTimes) { + archivePath, err = dbs.getArchivePath(stableID) + count++ + } + + return archivePath, err +} +func (dbs *SDAdb) getArchivePath(stableID string) (string, error) { + dbs.checkAndReconnectIfNeeded() + db := dbs.DB + const getFileID = "SELECT archive_file_path from sda.files WHERE stable_id = $1;" + + var archivePath string + err := db.QueryRow(getFileID, stableID).Scan(&archivePath) + if err != nil { + return "", err + } + + return archivePath, nil +} diff --git a/sda/internal/database/db_functions_test.go b/sda/internal/database/db_functions_test.go index 8f6ac6995..9420731b4 100644 --- a/sda/internal/database/db_functions_test.go +++ b/sda/internal/database/db_functions_test.go @@ -364,3 +364,105 @@ func (suite *DatabaseTests) TestUpdateDatasetEvent() { err = db.UpdateDatasetEvent(dID, "deprecated", "{\"type\": \"deprecate\"}") assert.NoError(suite.T(), err, "got (%v) when creating new connection", err) } + +func (suite *DatabaseTests) TestGetHeaderForStableID() { + db, err := NewSDAdb(suite.dbConf) + assert.NoError(suite.T(), err, "got %v when creating new connection", err) + + // register a file in the database + fileID, err := db.RegisterFile("/testuser/TestGetHeaderForStableID.c4gh", "testuser") + assert.NoError(suite.T(), err, "failed to register file in database") + + err = db.StoreHeader([]byte("HEADER"), fileID) + assert.NoError(suite.T(), err, "failed to store file header") + + stableID := "TEST:010-1234-4567" + err = db.SetAccessionID(stableID, fileID) + assert.NoError(suite.T(), err, "got (%v) when setting stable ID: %s, %s", err, stableID, fileID) + + header, err := db.GetHeaderForStableID("TEST:010-1234-4567") + assert.NoError(suite.T(), err, "failed to get header for stable ID: %v", err) + assert.Equal(suite.T(), header, []byte("HEADER"), "did not get expected header") +} + +func (suite *DatabaseTests) TestGetSyncData() { + db, err := NewSDAdb(suite.dbConf) + assert.NoError(suite.T(), err, "got %v when creating new connection", err) + + // register a file in the database + fileID, err := db.RegisterFile("/testuser/TestGetGetSyncData.c4gh", "testuser") + assert.NoError(suite.T(), err, "failed to register file in database") + + checksum := fmt.Sprintf("%x", sha256.New().Sum(nil)) + fileInfo := FileInfo{fmt.Sprintf("%x", sha256.New().Sum(nil)), 1234, "/tmp/TestGetGetSyncData.c4gh", checksum, 999} + corrID := uuid.New().String() + err = db.SetArchived(fileInfo, fileID, corrID) + assert.NoError(suite.T(), err, "failed to mark file as Archived") + + err = db.SetVerified(fileInfo, fileID, corrID) + assert.NoError(suite.T(), err, "failed to mark file as Verified") + + stableID := "TEST:000-1111-2222" + err = db.SetAccessionID(stableID, fileID) + assert.NoError(suite.T(), err, "got (%v) when setting stable ID: %s, %s", err, stableID, fileID) + + fileData, err := db.getSyncData("TEST:000-1111-2222") + assert.NoError(suite.T(), err, "failed to get sync data for file") + assert.Equal(suite.T(), "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", fileData.Checksum, "did not get expected file checksum") + assert.Equal(suite.T(), "/testuser/TestGetGetSyncData.c4gh", fileData.FilePath, "did not get expected file path") + assert.Equal(suite.T(), "testuser", fileData.User, "did not get expected user") +} + +func (suite *DatabaseTests) TestCheckIfDatasetExists() { + db, err := NewSDAdb(suite.dbConf) + assert.NoError(suite.T(), err, "got %v when creating new connection", err) + + accessions := []string{} + for i := 0; i <= 3; i++ { + fileID, err := db.RegisterFile(fmt.Sprintf("/testuser/TestCheckIfDatasetExists-%d.c4gh", i), "testuser") + assert.NoError(suite.T(), err, "failed to register file in database") + + err = db.SetAccessionID(fmt.Sprintf("accession-%d", i), fileID) + assert.NoError(suite.T(), err, "got (%v) when getting file archive information", err) + + accessions = append(accessions, fmt.Sprintf("accession-%d", i)) + } + + diSet := map[string][]string{ + "dataset": accessions[0:3], + } + + for di, acs := range diSet { + err := db.MapFilesToDataset(di, acs) + assert.NoError(suite.T(), err, "failed to map file to dataset") + } + + ok, err := db.checkIfDatasetExists("dataset") + assert.NoError(suite.T(), err, "check if dataset exists failed") + assert.Equal(suite.T(), ok, true) + + ok, err = db.checkIfDatasetExists("missing dataset") + assert.NoError(suite.T(), err, "check if dataset exists failed") + assert.Equal(suite.T(), ok, false) +} + +func (suite *DatabaseTests) TestGetArchivePath() { + db, err := NewSDAdb(suite.dbConf) + assert.NoError(suite.T(), err, "got (%v) when creating new connection", err) + + fileID, err := db.RegisterFile("/testuser/TestGetArchivePath-001.c4gh", "testuser") + assert.NoError(suite.T(), err, "failed to register file in database") + + checksum := fmt.Sprintf("%x", sha256.New()) + corrID := uuid.New().String() + fileInfo := FileInfo{fmt.Sprintf("%x", sha256.New()), 1234, corrID, checksum, 999} + err = db.SetArchived(fileInfo, fileID, corrID) + assert.NoError(suite.T(), err, "failed to mark file as Archived") + + err = db.SetAccessionID("acession-0001", fileID) + assert.NoError(suite.T(), err, "got (%v) when getting file archive information", err) + + path, err := db.getArchivePath("acession-0001") + assert.NoError(suite.T(), err, "getArchivePath failed") + assert.Equal(suite.T(), path, corrID) +} diff --git a/sda/internal/schema/schema.go b/sda/internal/schema/schema.go index 9c2c68eec..c01440469 100644 --- a/sda/internal/schema/schema.go +++ b/sda/internal/schema/schema.go @@ -62,6 +62,10 @@ func getStructName(path string) interface{} { return new(IngestionUserError) case "ingestion-verification": return new(IngestionVerification) + case "file-sync": + return new(SyncDataset) + case "metadata-sync": + return new(SyncMetadata) default: return "" } @@ -135,10 +139,9 @@ type IngestionCompletion struct { } type IngestionTrigger struct { - Type string `json:"type"` - User string `json:"user"` - FilePath string `json:"filepath"` - EncryptedChecksums []Checksums `json:"encrypted_checksums"` + Type string `json:"type"` + User string `json:"user"` + FilePath string `json:"filepath"` } type IngestionUserError struct { @@ -155,3 +158,24 @@ type IngestionVerification struct { EncryptedChecksums []Checksums `json:"encrypted_checksums"` ReVerify bool `json:"re_verify"` } + +type SyncDataset struct { + DatasetID string `json:"dataset_id"` + DatasetFiles []DatasetFiles `json:"dataset_files"` + User string `json:"user"` +} + +type DatasetFiles struct { + FilePath string `json:"filepath"` + FileID string `json:"file_id"` + ShaSum string `json:"sha256"` +} + +type SyncMetadata struct { + DatasetID string `json:"dataset_id"` + Metadata interface{} `json:"metadata"` +} + +type Metadata struct { + Metadata interface{} +} diff --git a/sda/internal/schema/schema_test.go b/sda/internal/schema/schema_test.go index d3600f956..d4c85b647 100644 --- a/sda/internal/schema/schema_test.go +++ b/sda/internal/schema/schema_test.go @@ -261,9 +261,6 @@ func TestValidateJSONIngestionTrigger(t *testing.T) { Type: "ingest", User: "JohnDoe", FilePath: "path/to/file", - EncryptedChecksums: []Checksums{ - {Type: "sha256", Value: "da886a89637d125ef9f15f6d676357f3a9e5e10306929f0bad246375af89c2e2"}, - }, } msg, _ := json.Marshal(okMsg) @@ -273,9 +270,6 @@ func TestValidateJSONIngestionTrigger(t *testing.T) { badMsg := IngestionTrigger{ User: "JohnDoe", FilePath: "path/to file", - EncryptedChecksums: []Checksums{ - {Type: "sha256", Value: "da886a89637d125ef9f15f6d676357f3a9e5e10306929f0bad246375af89c2e2"}, - }, } msg, _ = json.Marshal(badMsg) @@ -416,3 +410,53 @@ func TestValidateJSONIsolatedIngestionCompletion(t *testing.T) { msg, _ = json.Marshal(badMsg) assert.Error(t, ValidateJSON(fmt.Sprintf("%s/isolated/ingestion-completion.json", schemaPath), msg)) } + +func TestValidateJSONBigpictureFileSync(t *testing.T) { + okMsg := SyncDataset{ + DatasetID: "cd532362-e06e-4460-8490-b9ce64b8d9e7", + DatasetFiles: []DatasetFiles{ + { + FilePath: "inbox/user/file1.c4gh", + FileID: "5fe7b660-afea-4c3a-88a9-3daabf055ebb", + ShaSum: "82E4e60e7beb3db2e06A00a079788F7d71f75b61a4b75f28c4c942703dabb6d6", + }, + { + FilePath: "inbox/user/file2.c4gh", + FileID: "ed6af454-d910-49e3-8cda-488a6f246e76", + ShaSum: "c967d96e56dec0f0cfee8f661846238b7f15771796ee1c345cae73cd812acc2b", + }, + }, + User: "test.user@example.com", + } + + msg, _ := json.Marshal(okMsg) + assert.Nil(t, ValidateJSON(fmt.Sprintf("%s/bigpicture/file-sync.json", schemaPath), msg)) + + badMsg := SyncDataset{ + DatasetID: "cd532362-e06e-4460-8490-b9ce64b8d9e7", + DatasetFiles: []DatasetFiles{{}}, + } + + msg, _ = json.Marshal(badMsg) + assert.Error(t, ValidateJSON(fmt.Sprintf("%s/bigpicture/file-sync.json", schemaPath), msg)) +} + +func TestValidateJSONBigpictureMetadtaSync(t *testing.T) { + okMsg := SyncMetadata{ + DatasetID: "cd532362-e06e-4460-8490-b9ce64b8d9e7", + Metadata: Metadata{ + Metadata: "foo", + }, + } + + msg, _ := json.Marshal(okMsg) + assert.Nil(t, ValidateJSON(fmt.Sprintf("%s/bigpicture/metadata-sync.json", schemaPath), msg)) + + badMsg := SyncMetadata{ + DatasetID: "cd532362-e06e-4460-8490-b9ce64b8d9e7", + Metadata: nil, + } + + msg, _ = json.Marshal(badMsg) + assert.Error(t, ValidateJSON(fmt.Sprintf("%s/bigpicture/metadata-sync.json", schemaPath), msg)) +} diff --git a/sda/schemas/bigpicture/file-sync.json b/sda/schemas/bigpicture/file-sync.json new file mode 100644 index 000000000..1127acab2 --- /dev/null +++ b/sda/schemas/bigpicture/file-sync.json @@ -0,0 +1,119 @@ +{ + "title": "JSON schema for file syncing message interface.", + "$id": "https://github.com/neicnordic/sensitive-data-archive/tree/master/sda/schemas/bigpicture/file-sync.json", + "$schema": "http://json-schema.org/draft-07/schema", + "type": "object", + "required": [ + "dataset_id", + "dataset_files", + "user" + ], + "additionalProperties": false, + "definitions": { + "dataset_files": { + "$id": "#/definitions/dataset_files", + "type": "object", + "minProperties": 3, + "maxProperties": 3, + "title": "File information schema", + "description": "Informations about a file", + "examples": [ + { + "filepath": "path/to/file", + "file_id": "16f3edd1-3c40-4284-9f82-1055361e655b", + "sha256": "82e4e60e7beb3db2e06a00a079788f7d71f75b61a4b75f28c4c942703dabb6d6" + } + ], + "required": [ + "filepath", + "file_id", + "sha256" + ], + "additionalProperties": false, + "properties": { + "filepath": { + "$id": "#/definitions/dataset_files/properties/filepath", + "type": "string", + "title": "The inbox filepath", + "description": "The inbox filepath", + "minLength": 5 + }, + "file_id": { + "$id": "#/definitions/dataset_files/properties/file_id", + "type": "string", + "title": "The checksum value in hex format", + "description": "The checksum value in (case-insensitive) hex format", + "minLength": 11, + "pattern": "^\\S+$", + "examples": [ + "16f3edd1-3c40-4284-9f82-1055361e655b" + ] + }, + "sha256": { + "$id": "#/definitions/checksum-sha256/properties/sha256", + "type": "string", + "title": "The decrypred checksum value in hex format", + "description": "The checksum value in (case-insensitive) hex format", + "pattern": "^[a-fA-F0-9]{64}$", + "examples": [ + "82E4e60e7beb3db2e06A00a079788F7d71f75b61a4b75f28c4c942703dabb6d6" + ] + } + } + } + }, + "properties": { + "dataset_id": { + "$id": "#/properties/dataset_id", + "type": "string", + "title": "The Accession identifier for the dataset", + "description": "The Accession identifier for the dataset", + "minLength": 11, + "pattern": "^\\S+$", + "examples": [ + "anyidentifier" + ] + }, + "dataset_files": { + "$id": "#/properties/dataset_files", + "type": "array", + "title": "The files in that dataset", + "description": "The files in that dataset", + "minItems": 1, + "examples": [ + [ + { + "filepath": "path/to/file1.c4gh", + "file_id": "16f3edd1-3c40-4284-9f82-1055361e655b" + }, + { + "filepath": "path/to/file2.c4gh", + "file_id": "ba824437-ffc0-4431-b6a0-73968c1bb1ed" + } + ] + ], + "additionalItems": false, + "items": { + "$ref": "#/definitions/dataset_files", + "properties": { + "filepath": { + "$ref": "#/definitions/dataset_files/properties/filepath" + }, + "file_id": { + "$ref": "#/definitions/dataset_files/properties/file_id" + } + } + } + }, + "user": { + "$id": "#/properties/user", + "type": "string", + "title": "The username", + "description": "The username", + "minLength": 5, + "examples": [ + "user.name@example.com" + ] + } + } +} diff --git a/sda/schemas/bigpicture/metadata-sync.json b/sda/schemas/bigpicture/metadata-sync.json new file mode 100644 index 000000000..049cc345a --- /dev/null +++ b/sda/schemas/bigpicture/metadata-sync.json @@ -0,0 +1,32 @@ +{ + "title": "JSON schema for file syncing message interface.", + "$id": "https://github.com/neicnordic/sensitive-data-archive/tree/master/sda/schemas/bigpicture/metadata-sync.json", + "$schema": "http://json-schema.org/draft-07/schema", + "type": "object", + "required": [ + "dataset_id", + "metadata" + ], + "additionalProperties": false, + "properties": { + "dataset_id": { + "$id": "#/properties/dataset_id", + "type": "string", + "title": "The Accession identifier for the dataset", + "description": "The Accession identifier for the dataset", + "minLength": 11, + "pattern": "^\\S+$", + "examples": [ + "anyidentifier" + ] + }, + "metadata": { + "$id": "#/properties/metadata", + "type": "object", + "title": "Metadata for the dataset", + "description": "Metadata for the dataset", + "minProperties": 1, + "pattern": "^\\S+$" + } + } +} diff --git a/sda/schemas/federated/dataset-deprecate.json b/sda/schemas/federated/dataset-deprecate.json index 3844f0659..620776446 100644 --- a/sda/schemas/federated/dataset-deprecate.json +++ b/sda/schemas/federated/dataset-deprecate.json @@ -1,6 +1,6 @@ { "title": "JSON schema for Local EGA dataset deprecation message interface", - "$id": "https://github.com/EGA-archive/LocalEGA/tree/master/schemas/dataset-deprecate.json", + "$id": "https://github.com/neicnordic/sensitive-data-archive/tree/master/sda/schemas/federated/dataset-deprecate.json", "$schema": "http://json-schema.org/draft-07/schema", "type": "object", "required": [ diff --git a/sda/schemas/federated/dataset-mapping.json b/sda/schemas/federated/dataset-mapping.json index 574e0743d..07a631834 100644 --- a/sda/schemas/federated/dataset-mapping.json +++ b/sda/schemas/federated/dataset-mapping.json @@ -1,6 +1,6 @@ { "title": "JSON schema for Local EGA dataset mapping message interface", - "$id": "https://github.com/EGA-archive/LocalEGA/tree/master/schemas/dataset-mapping.json", + "$id": "https://github.com/neicnordic/sensitive-data-archive/tree/master/sda/schemas/federated/dataset-mapping.json", "$schema": "http://json-schema.org/draft-07/schema", "type": "object", "required": [ diff --git a/sda/schemas/federated/dataset-release.json b/sda/schemas/federated/dataset-release.json index e1e688fb4..0a4ccf2c2 100644 --- a/sda/schemas/federated/dataset-release.json +++ b/sda/schemas/federated/dataset-release.json @@ -1,6 +1,6 @@ { "title": "JSON schema for Local EGA dataset release message interface", - "$id": "https://github.com/EGA-archive/LocalEGA/tree/master/schemas/dataset-release.json", + "$id": "https://github.com/neicnordic/sensitive-data-archive/tree/master/sda/schemas/federated/dataset-release.json", "$schema": "http://json-schema.org/draft-07/schema", "type": "object", "required": [ diff --git a/sda/schemas/federated/inbox-remove.json b/sda/schemas/federated/inbox-remove.json index ed19753e5..23acc6603 100644 --- a/sda/schemas/federated/inbox-remove.json +++ b/sda/schemas/federated/inbox-remove.json @@ -1,6 +1,6 @@ { "title": "JSON schema for Local EGA inbox remove message interface", - "$id": "https://github.com/EGA-archive/LocalEGA/tree/master/schemas/inbox-remove.json", + "$id": "https://github.com/neicnordic/sensitive-data-archive/tree/master/sda/schemas/federated/inbox-remove.json", "$schema": "http://json-schema.org/draft-07/schema", "type": "object", "required": [ diff --git a/sda/schemas/federated/inbox-rename.json b/sda/schemas/federated/inbox-rename.json index e8aa5ae4e..a8e185983 100644 --- a/sda/schemas/federated/inbox-rename.json +++ b/sda/schemas/federated/inbox-rename.json @@ -1,6 +1,6 @@ { "title": "JSON schema for Local EGA inbox rename message interface", - "$id": "https://github.com/EGA-archive/LocalEGA/tree/master/schemas/inbox-rename.json", + "$id": "https://github.com/neicnordic/sensitive-data-archive/tree/master/sda/schemas/federated/inbox-rename.json", "$schema": "http://json-schema.org/draft-07/schema", "type": "object", "required": [ diff --git a/sda/schemas/federated/inbox-upload.json b/sda/schemas/federated/inbox-upload.json index 02ab62659..4158a12a9 100644 --- a/sda/schemas/federated/inbox-upload.json +++ b/sda/schemas/federated/inbox-upload.json @@ -1,6 +1,6 @@ { "title": "JSON schema for Local EGA inbox upload message interface", - "$id": "https://github.com/EGA-archive/LocalEGA/tree/master/schemas/inbox-upload.json", + "$id": "https://github.com/neicnordic/sensitive-data-archive/tree/master/sda/schemas/federated/inbox-upload.json", "$schema": "http://json-schema.org/draft-07/schema", "type": "object", "required": [ diff --git a/sda/schemas/federated/info-error.json b/sda/schemas/federated/info-error.json index 881e835cd..5ac8dcf85 100644 --- a/sda/schemas/federated/info-error.json +++ b/sda/schemas/federated/info-error.json @@ -1,6 +1,6 @@ { "title": "JSON schema for Local EGA dataset mapping message interface", - "$id": "https://github.com/neicnordic/sda-pipeline/tree/master/schemas/info-error.json", + "$id": "https://github.com/neicnordic/sensitive-data-archive/tree/master/sda/schemas/federated/info-error.json", "$schema": "http://json-schema.org/draft-07/schema", "type": "object", "required": [ diff --git a/sda/schemas/federated/ingestion-accession-request.json b/sda/schemas/federated/ingestion-accession-request.json index 61bd8ba56..0030e6863 100644 --- a/sda/schemas/federated/ingestion-accession-request.json +++ b/sda/schemas/federated/ingestion-accession-request.json @@ -1,6 +1,6 @@ { "title": "JSON schema for Local EGA message interface for requesting an Accession ID to Central EGA", - "$id": "https://github.com/EGA-archive/LocalEGA/tree/master/schemas/ingestion-accession-request.json", + "$id": "https://github.com/neicnordic/sensitive-data-archive/tree/master/sda/schemas/federated/ingestion-accession-request.json", "$schema": "http://json-schema.org/draft-07/schema", "type": "object", "required": [ diff --git a/sda/schemas/federated/ingestion-accession.json b/sda/schemas/federated/ingestion-accession.json index a8d6b4799..b39de8422 100644 --- a/sda/schemas/federated/ingestion-accession.json +++ b/sda/schemas/federated/ingestion-accession.json @@ -1,6 +1,6 @@ { "title": "JSON schema for Local EGA accession message interface", - "$id": "https://github.com/EGA-archive/LocalEGA/tree/master/schemas/ingestion-accession.json", + "$id": "https://github.com/neicnordic/sensitive-data-archive/tree/master/sda/schemas/federated/ingestion-accession.json", "$schema": "http://json-schema.org/draft-07/schema", "type": "object", "required": [ diff --git a/sda/schemas/federated/ingestion-completion.json b/sda/schemas/federated/ingestion-completion.json index 06854fc78..9218a19ce 100644 --- a/sda/schemas/federated/ingestion-completion.json +++ b/sda/schemas/federated/ingestion-completion.json @@ -1,6 +1,6 @@ { "title": "JSON schema for Local EGA message completion to Central EGA", - "$id": "https://github.com/EGA-archive/LocalEGA/tree/master/schemas/ingestion-completion.json", + "$id": "https://github.com/neicnordic/sensitive-data-archive/tree/master/sda/schemas/federated/ingestion-completion.json", "$schema": "http://json-schema.org/draft-07/schema", "type": "object", "required": [ diff --git a/sda/schemas/federated/ingestion-trigger.json b/sda/schemas/federated/ingestion-trigger.json index 404733939..87bb270e6 100644 --- a/sda/schemas/federated/ingestion-trigger.json +++ b/sda/schemas/federated/ingestion-trigger.json @@ -1,6 +1,6 @@ { "title": "JSON schema for Local EGA ingestion trigger message interface", - "$id": "https://github.com/EGA-archive/LocalEGA/tree/master/schemas/ingestion-ingest.json", + "$id": "https://github.com/neicnordic/sensitive-data-archive/tree/master/sda/schemas/federated/ingestion-trigger.json", "$schema": "http://json-schema.org/draft-07/schema", "type": "object", "required": [ diff --git a/sda/schemas/federated/ingestion-user-error.json b/sda/schemas/federated/ingestion-user-error.json index 8599ea147..f94f67386 100644 --- a/sda/schemas/federated/ingestion-user-error.json +++ b/sda/schemas/federated/ingestion-user-error.json @@ -1,6 +1,6 @@ { "title": "JSON schema for Local EGA message interface to Central EGA", - "$id": "https://github.com/EGA-archive/LocalEGA/tree/master/schemas/ingestion-user-error.json", + "$id": "https://github.com/neicnordic/sensitive-data-archive/tree/master/sda/schemas/federated/ingestion-user-error.json", "$schema": "http://json-schema.org/draft-07/schema", "type": "object", "required": [ diff --git a/sda/schemas/federated/ingestion-verification.json b/sda/schemas/federated/ingestion-verification.json index fd63dcbf5..9e645d089 100644 --- a/sda/schemas/federated/ingestion-verification.json +++ b/sda/schemas/federated/ingestion-verification.json @@ -1,6 +1,6 @@ { "title": "JSON schema for SDA verification message interface", - "$id": "https://github.com/neicnordic/sda-pipeline/tree/master/schemas/ingestion-verification.json", + "$id": "https://github.com/neicnordic/sensitive-data-archive/tree/master/sda/schemas/federated/ingestion-verification.json", "$schema": "http://json-schema.org/draft-07/schema", "type": "object", "required": [ diff --git a/sda/schemas/isolated/dataset-deprecate.json b/sda/schemas/isolated/dataset-deprecate.json new file mode 120000 index 000000000..c3fd79a98 --- /dev/null +++ b/sda/schemas/isolated/dataset-deprecate.json @@ -0,0 +1 @@ +../federated/dataset-deprecate.json \ No newline at end of file diff --git a/sda/schemas/isolated/dataset-mapping.json b/sda/schemas/isolated/dataset-mapping.json index 603713c4d..28f736d72 100644 --- a/sda/schemas/isolated/dataset-mapping.json +++ b/sda/schemas/isolated/dataset-mapping.json @@ -1,6 +1,6 @@ { "title": "JSON schema for dataset mapping message interface. Derived from Federated EGA schemas.", - "$id": "https://github.com/EGA-archive/LocalEGA/tree/master/schemas/dataset-mapping.json", + "$id": "https://github.com/neicnordic/sensitive-data-archive/tree/master/sda/schemas/isolated/dataset-mapping.json", "$schema": "http://json-schema.org/draft-07/schema", "type": "object", "required": [ diff --git a/sda/schemas/isolated/dataset-release.json b/sda/schemas/isolated/dataset-release.json new file mode 120000 index 000000000..e22bb197c --- /dev/null +++ b/sda/schemas/isolated/dataset-release.json @@ -0,0 +1 @@ +../federated/dataset-release.json \ No newline at end of file diff --git a/sda/schemas/isolated/inbox-remove.json b/sda/schemas/isolated/inbox-remove.json index c371d6b2b..1c71456b0 100644 --- a/sda/schemas/isolated/inbox-remove.json +++ b/sda/schemas/isolated/inbox-remove.json @@ -1,6 +1,6 @@ { "title": "JSON schema for Local EGA inbox remove message interface", - "$id": "https://github.com/EGA-archive/LocalEGA/tree/master/schemas/inbox-remove.json", + "$id": "https://github.com/neicnordic/sensitive-data-archive/tree/master/sda/schemas/isolated/inbox-remove.json", "$schema": "http://json-schema.org/draft-07/schema", "type": "object", "required": [ diff --git a/sda/schemas/isolated/inbox-rename.json b/sda/schemas/isolated/inbox-rename.json index d557b2c87..445376684 100644 --- a/sda/schemas/isolated/inbox-rename.json +++ b/sda/schemas/isolated/inbox-rename.json @@ -1,6 +1,6 @@ { "title": "JSON schema for Local EGA inbox rename message interface", - "$id": "https://github.com/EGA-archive/LocalEGA/tree/master/schemas/inbox-rename.json", + "$id": "https://github.com/neicnordic/sensitive-data-archive/tree/master/sda/schemas/isolated/inbox-rename.json", "$schema": "http://json-schema.org/draft-07/schema", "type": "object", "required": [ diff --git a/sda/schemas/isolated/inbox-upload.json b/sda/schemas/isolated/inbox-upload.json index 05a6585b1..a21ab542b 100644 --- a/sda/schemas/isolated/inbox-upload.json +++ b/sda/schemas/isolated/inbox-upload.json @@ -1,6 +1,6 @@ { "title": "JSON schema for Local EGA inbox upload message interface", - "$id": "https://github.com/EGA-archive/LocalEGA/tree/master/schemas/inbox-upload.json", + "$id": "https://github.com/neicnordic/sensitive-data-archive/tree/master/sda/schemas/isolated/inbox-upload.json", "$schema": "http://json-schema.org/draft-07/schema", "type": "object", "required": [ diff --git a/sda/schemas/isolated/ingestion-accession.json b/sda/schemas/isolated/ingestion-accession.json index 844e363d1..4b3cdfe16 100644 --- a/sda/schemas/isolated/ingestion-accession.json +++ b/sda/schemas/isolated/ingestion-accession.json @@ -1,6 +1,6 @@ { "title": "JSON schema for accession message interface. Derived from Federated EGA schemas.", - "$id": "https://github.com/EGA-archive/LocalEGA/tree/master/schemas/ingestion-accession.json", + "$id": "https://github.com/neicnordic/sensitive-data-archive/tree/master/sda/schemas/isolated/ingestion-accession.json", "$schema": "http://json-schema.org/draft-07/schema", "type": "object", "required": [ diff --git a/sda/schemas/isolated/ingestion-completion.json b/sda/schemas/isolated/ingestion-completion.json index 6e6034381..f6ce335da 100644 --- a/sda/schemas/isolated/ingestion-completion.json +++ b/sda/schemas/isolated/ingestion-completion.json @@ -1,6 +1,6 @@ { "title": "JSON schema for sending message for ingestion completion. Derived from Federated EGA schemas.", - "$id": "https://github.com/EGA-archive/LocalEGA/tree/master/schemas/ingestion-completion.json", + "$id": "https://github.com/neicnordic/sensitive-data-archive/tree/master/sda/schemas/isolated/ingestion-completion.json", "$schema": "http://json-schema.org/draft-07/schema", "type": "object", "required": [ @@ -117,7 +117,7 @@ "$id": "#/properties/decrypted_checksums", "type": "array", "title": "The checksums of the original file", - "description": "The checksums of the original file. The md5 one is required", + "description": "The checksums of the original file.", "examples": [ [ { @@ -134,7 +134,7 @@ "type": "object", "properties": { "type": { - "const": "md5" + "const": "sha256" } }, "required": [