Skip to content

Commit

Permalink
api: add a new endpoint to publish a census async
Browse files Browse the repository at this point in the history
Since the publish of the census can take some time, allow
the API clients to execute the operation async.

So two new endpoints are introduced:

/censuses/:censusId/publish/async
/censuses/:censusId/check

Signed-off-by: p4u <[email protected]>
  • Loading branch information
p4u committed Mar 12, 2024
1 parent 19bc1eb commit a761b29
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 26 deletions.
3 changes: 3 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"path/filepath"
"strings"
"sync"

"go.vocdoni.io/dvote/api/censusdb"
"go.vocdoni.io/dvote/data"
Expand Down Expand Up @@ -69,6 +70,8 @@ type API struct {
vocinfo *vochaininfo.VochainInfo
censusdb *censusdb.CensusDB
db db.Database // used for internal db operations

censusPublishStatusMap sync.Map // used to store the status of the census publishing process when async
}

// NewAPI creates a new instance of the API. Attach must be called next.
Expand Down
139 changes: 115 additions & 24 deletions api/censuses.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"strings"
"time"

Expand Down Expand Up @@ -111,6 +112,22 @@ func (a *API) enableCensusHandlers() error {
); err != nil {
return err
}
if err := a.Endpoint.RegisterMethod(
"/censuses/{censusID}/publish/async",
"POST",
apirest.MethodAccessTypePublic,
a.censusPublishHandler,
); err != nil {
return err
}
if err := a.Endpoint.RegisterMethod(
"/censuses/{censusID}/check",
"GET",
apirest.MethodAccessTypePublic,
a.censusPublishCheckHandler,
); err != nil {
return err
}
if err := a.Endpoint.RegisterMethod(
"/censuses/{censusID}/publish/{root}",
"POST",
Expand Down Expand Up @@ -615,7 +632,7 @@ func (a *API) censusDeleteHandler(msg *apirest.APIdata, ctx *httprouter.HTTPCont
// @Success 200 {object} object{census=object{censusID=string,uri=string}} "It return published censusID and the ipfs uri where its uploaded"
// @Param censusID path string true "Census id"
// @Router /censuses/{censusID}/publish [post]
// /censuses/{censusID}/publish/{root} [post] Endpoint docs generated on docs/models/model.go
// @Router /censuses/{censusID}/publish/async [post]
func (a *API) censusPublishHandler(msg *apirest.APIdata, ctx *httprouter.HTTPContext) error {
token, err := uuid.Parse(msg.AuthToken)
if err != nil {
Expand All @@ -626,6 +643,10 @@ func (a *API) censusPublishHandler(msg *apirest.APIdata, ctx *httprouter.HTTPCon
return err
}

// check if the request is async
url := strings.Split(ctx.Request.URL.Path, "/")
async := url[len(url)-1] == "async"

ref, err := a.censusdb.Load(censusID, &token)
defer a.censusdb.UnLoad()
if err != nil {
Expand Down Expand Up @@ -665,6 +686,11 @@ func (a *API) censusPublishHandler(msg *apirest.APIdata, ctx *httprouter.HTTPCon
}
return err
}
// if async, store the URI in the map for the check endpoint
if async {
a.censusPublishStatusMap.Store(hex.EncodeToString(root), ref.URI)
}

var data []byte
if data, err = json.Marshal(&Census{
CensusID: root,
Expand All @@ -675,36 +701,64 @@ func (a *API) censusPublishHandler(msg *apirest.APIdata, ctx *httprouter.HTTPCon
return ctx.Send(data, apirest.HTTPstatusOK)
}

// dump the current tree to import them after
dump, err := ref.Tree().Dump()
if err != nil {
return err
}

// export the tree to the remote storage (IPFS)
uri := ""
if a.storage != nil {
exportData, err := censusdb.BuildExportDump(root, dump,
models.Census_Type(ref.CensusType), ref.MaxLevels)
publishCensus := func() (string, error) {
// dump the current tree to import them after
dump, err := ref.Tree().Dump()
if err != nil {
return err
return "", err
}
sctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
cid, err := a.storage.Publish(sctx, exportData)

// export the tree to the remote storage (IPFS)
uri := ""
if a.storage != nil {
exportData, err := censusdb.BuildExportDump(root, dump,
models.Census_Type(ref.CensusType), ref.MaxLevels)
if err != nil {
return "", err
}
sctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
cid, err := a.storage.Publish(sctx, exportData)
if err != nil {
log.Errorf("could not export tree to storage: %v", err)
} else {
uri = a.storage.URIprefix() + cid
}
}

newRef, err := a.censusdb.New(root, models.Census_Type(ref.CensusType), uri,
nil, ref.MaxLevels)
if err != nil {
log.Errorf("could not export tree to storage: %v", err)
} else {
uri = a.storage.URIprefix() + cid
return "", err
}
return uri, newRef.Tree().ImportDump(dump)
}

newRef, err := a.censusdb.New(root, models.Census_Type(ref.CensusType), uri,
nil, ref.MaxLevels)
if err != nil {
return err
if async {
a.censusPublishStatusMap.Store(hex.EncodeToString(root), "")

go func() {
uri, err := publishCensus()
if err != nil {
log.Errorw(err, "could not publish census")
a.censusPublishStatusMap.Store(hex.EncodeToString(root), fmt.Sprintf("error: %v", err.Error()))
return
}
a.censusPublishStatusMap.Store(hex.EncodeToString(root), uri)
}()

var data []byte
if data, err = json.Marshal(&Census{
CensusID: root,
}); err != nil {
return err
}

return ctx.Send(data, apirest.HTTPstatusOK)
}
if err := newRef.Tree().ImportDump(dump); err != nil {

uri, err := publishCensus()
if err != nil {
return err
}

Expand All @@ -719,6 +773,43 @@ func (a *API) censusPublishHandler(msg *apirest.APIdata, ctx *httprouter.HTTPCon
return ctx.Send(data, apirest.HTTPstatusOK)
}

// censusPublishCheckHandler
//
// @Summary Check census publish status
// @Description.markdown censusPublishCheckHandler
// @Tags Censuses
// @Produce json
// @Success 200 {object} object{census=object{censusID=string,uri=string}} "It return published censusID and the ipfs uri where its uploaded"
// @Param censusID path string true "Census id"
// @Router /censuses/{censusID}/check [get]
func (a *API) censusPublishCheckHandler(_ *apirest.APIdata, ctx *httprouter.HTTPContext) error {
censusID, err := censusIDparse(ctx.URLParam("censusID"))
if err != nil {
return err
}
uriOrErrorAny, ok := a.censusPublishStatusMap.Load(hex.EncodeToString(censusID))
if !ok {
return ErrCensusNotFound
}
log.Warnf("census publish status: %v", uriOrErrorAny)
uriOrError := uriOrErrorAny.(string)
if uriOrError == "" {
return ctx.Send(nil, apirest.HTTPstatusNoContent)
}
if strings.HasPrefix(uriOrError, "error:") {
return ErrCensusBuild.With(uriOrError[7:])
}
var data []byte
if data, err = json.Marshal(&Census{
CensusID: censusID,
URI: uriOrError,
}); err != nil {
return err
}
a.censusPublishStatusMap.Delete(hex.EncodeToString(censusID))
return ctx.Send(data, apirest.HTTPstatusOK)
}

// censusProofHandler
//
// @Summary Prove key to census
Expand Down
1 change: 1 addition & 0 deletions api/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,5 @@ var (
ErrCantCountVotes = apirest.APIerror{Code: 5029, HTTPstatus: apirest.HTTPstatusInternalErr, Err: fmt.Errorf("cannot count votes")}
ErrVochainOverloaded = apirest.APIerror{Code: 5030, HTTPstatus: apirest.HTTPstatusServiceUnavailable, Err: fmt.Errorf("vochain overloaded")}
ErrGettingSIK = apirest.APIerror{Code: 5031, HTTPstatus: apirest.HTTPstatusInternalErr, Err: fmt.Errorf("error getting SIK")}
ErrCensusBuild = apirest.APIerror{Code: 5032, HTTPstatus: apirest.HTTPstatusInternalErr, Err: fmt.Errorf("error building census")}
)
23 changes: 22 additions & 1 deletion apiclient/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"math/big"
"time"

"go.vocdoni.io/dvote/api"
"go.vocdoni.io/dvote/httprouter/apirest"
Expand Down Expand Up @@ -79,17 +80,37 @@ func (c *HTTPclient) CensusSize(censusID types.HexBytes) (uint64, error) {
// CensusPublish publishes a census to the distributed data storage and returns its root hash
// and storage URI.
func (c *HTTPclient) CensusPublish(censusID types.HexBytes) (types.HexBytes, string, error) {
resp, code, err := c.Request(HTTPPOST, nil, "censuses", censusID.String(), "publish")
resp, code, err := c.Request(HTTPPOST, nil, "censuses", censusID.String(), "publish", "async")
if err != nil {
return nil, "", err
}
if code != apirest.HTTPstatusOK {
return nil, "", fmt.Errorf("%s: %d (%s)", errCodeNot200, code, resp)
}

censusData := &api.Census{}
if err := json.Unmarshal(resp, censusData); err != nil {
return nil, "", fmt.Errorf("could not unmarshal response: %w", err)
}

// wait for the census to be ready and get the root hash and storage URI
for {
time.Sleep(2 * time.Second)
resp, code, err := c.Request(HTTPGET, nil, "censuses", censusData.CensusID.String(), "check")
if err != nil {
return nil, "", err
}
if code == apirest.HTTPstatusOK {
if err := json.Unmarshal(resp, censusData); err != nil {
return nil, "", fmt.Errorf("could not unmarshal response: %w", err)
}
break
}
if code == apirest.HTTPstatusNoContent {
continue
}
return nil, "", fmt.Errorf("%s: %d (%s)", errCodeNot200, code, resp)
}
return censusData.CensusID, censusData.URI, nil
}

Expand Down
2 changes: 1 addition & 1 deletion dockerfiles/testsuite/env.gateway0
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
VOCDONI_DATADIR=/app/run
VOCDONI_MODE=gateway
VOCDONI_LOGLEVEL=debug
VOCDONI_VOCHAIN_LOGLEVEL=info
VOCDONI_VOCHAIN_LOGLEVEL=error
VOCDONI_DEV=True
VOCDONI_ENABLEAPI=True
VOCDONI_ENABLERPC=True
Expand Down

0 comments on commit a761b29

Please sign in to comment.