Skip to content

Commit

Permalink
Worker fix. Endpoint fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Zaptoss committed Aug 23, 2024
1 parent 2127a00 commit 8c6380c
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 60 deletions.
4 changes: 2 additions & 2 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,15 @@ event_types:
no_auto_open: true
- name: daily_question
title: Get daily question
reward: 0
reward: 5
frequency: unlimited
description: This event is used when a user get daily question
short_description: event for get daily question
auto_claim: true
no_auto_open: true

daily_questions:
timezone: +4
timezone: 4

levels:
downgradeable: false
Expand Down
16 changes: 9 additions & 7 deletions internal/cli/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/rarimo/geo-points-svc/internal/config"
"github.com/rarimo/geo-points-svc/internal/data/evtypes"
"github.com/rarimo/geo-points-svc/internal/service"
"github.com/rarimo/geo-points-svc/internal/service/workers/cleanquestiondeadlines"
"github.com/rarimo/geo-points-svc/internal/service/workers/expirywatch"
"github.com/rarimo/geo-points-svc/internal/service/workers/leveljustice"
"github.com/rarimo/geo-points-svc/internal/service/workers/nooneisforgotten"
Expand All @@ -17,12 +18,12 @@ import (
func runServices(ctx context.Context, cfg config.Config, wg *sync.WaitGroup) {
// signals indicate the finished initialization of each worker
var (
reopenerSig = make(chan struct{})
expiryWatchSig = make(chan struct{})
evTypesSig = make(chan struct{})
noOneIsForgottenSig = make(chan struct{})
levelJustice = make(chan struct{})
// cleanDQuestionDeadlines = make(chan struct{})
reopenerSig = make(chan struct{})
expiryWatchSig = make(chan struct{})
evTypesSig = make(chan struct{})
noOneIsForgottenSig = make(chan struct{})
levelJustice = make(chan struct{})
cleanDQuestionDeadlines = make(chan struct{})
)

run := func(f func()) {
Expand Down Expand Up @@ -50,7 +51,8 @@ func runServices(ctx context.Context, cfg config.Config, wg *sync.WaitGroup) {
run(func() { leveljustice.Run(cfg, levelJustice) })

//service for cleaning daily question deadlines after day
// run(func() { cleanquestiondeadlines.Run(ctx, cfg, cleanDQuestionDeadlines) })
run(func() { cleanquestiondeadlines.Run(ctx, cfg, cleanDQuestionDeadlines) })
<-cleanDQuestionDeadlines

// service depends on all the workers for good UX
<-expiryWatchSig
Expand Down
6 changes: 6 additions & 0 deletions internal/data/daily_questions.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ import (
"github.com/rarimo/geo-points-svc/resources"
)

const (
ColCorrectAnswers = "num_correct_answers"
ColIncorrectAnswers = "num_incorrect_answers"
ColAllParticipants = "num_all_participants"
)

type DailyQuestion struct {
ID int64 `db:"id"`
Title string `db:"title"`
Expand Down
1 change: 1 addition & 0 deletions internal/data/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type EventsQ interface {
SelectAbsentTypes(allTypes ...string) ([]ReopenableEvent, error)

FilterByID(...string) EventsQ
FilterByQuestionID(int) EventsQ
FilterByNullifier(...string) EventsQ
FilterByStatus(...EventStatus) EventsQ
FilterByType(...string) EventsQ
Expand Down
4 changes: 4 additions & 0 deletions internal/data/pg/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@ func (q *events) FilterTodayEvents(offset int) data.EventsQ {
return res
}

func (q *events) FilterByQuestionID(id int) data.EventsQ {
return q.applyCondition(squirrel.Eq{"meta->>'question_id'": id})
}

func (q *events) FilterInactiveNotClaimed(types ...string) data.EventsQ {
if len(types) == 0 {
return q
Expand Down
12 changes: 7 additions & 5 deletions internal/service/handlers/daily_question_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,13 @@ func CheckDailyQuestion(w http.ResponseWriter, r *http.Request) {
ape.Render(w, newDailyAnswer(question))
}

func newDailyAnswer(question *data.DailyQuestion) resources.DailyQuestionAnswers {
return resources.DailyQuestionAnswers{
Key: resources.NewKeyInt64(question.ID, resources.DAILY_QUESTIONS),
Attributes: resources.DailyQuestionAnswersAttributes{
Answer: question.CorrectAnswer,
func newDailyAnswer(question *data.DailyQuestion) resources.DailyQuestionAnswersResponse {
return resources.DailyQuestionAnswersResponse{
Data: resources.DailyQuestionAnswers{
Key: resources.NewKeyInt64(question.ID, resources.DAILY_QUESTIONS),
Attributes: resources.DailyQuestionAnswersAttributes{
Answer: question.CorrectAnswer,
},
},
}
}
20 changes: 11 additions & 9 deletions internal/service/handlers/daily_question_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,18 @@ func GetDailyQuestion(w http.ResponseWriter, r *http.Request) {
ape.Render(w, newDailyQuestions(question, options))
}

func newDailyQuestions(question *data.DailyQuestion, options []resources.DailyQuestionOptions) resources.DailyQuestions {
func newDailyQuestions(question *data.DailyQuestion, options []resources.DailyQuestionOptions) resources.DailyQuestionsResponse {

return resources.DailyQuestions{
Key: resources.Key{
ID: strconv.Itoa(int(question.ID)),
Type: resources.DAILY_QUESTIONS,
},
Attributes: resources.DailyQuestionsAttributes{
Options: options,
Title: question.Title,
return resources.DailyQuestionsResponse{
Data: resources.DailyQuestions{
Key: resources.Key{
ID: strconv.Itoa(int(question.ID)),
Type: resources.DAILY_QUESTIONS,
},
Attributes: resources.DailyQuestionsAttributes{
Options: options,
Title: question.Title,
},
},
}
}
16 changes: 9 additions & 7 deletions internal/service/handlers/daily_questions_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,15 @@ func GetDailyQuestionsStatus(w http.ResponseWriter, r *http.Request) {
ape.Render(w, newDailyQuestionsStatus(question))
}

func newDailyQuestionsStatus(question *data.DailyQuestion) resources.DailyQuestionsStatus {
return resources.DailyQuestionsStatus{
Key: resources.NewKeyInt64(question.ID, resources.DAILY_QUESTIONS),
Attributes: resources.DailyQuestionsStatusAttributes{
NextQuestionDate: question.StartsAt.Unix(),
Reward: int64(question.Reward),
TimeForAnswer: question.TimeForAnswer,
func newDailyQuestionsStatus(question *data.DailyQuestion) resources.DailyQuestionsStatusResponse {
return resources.DailyQuestionsStatusResponse{
Data: resources.DailyQuestionsStatus{
Key: resources.NewKeyInt64(question.ID, resources.DAILY_QUESTIONS_STATUS),
Attributes: resources.DailyQuestionsStatusAttributes{
NextQuestionDate: question.StartsAt.Unix(),
Reward: int64(question.Reward),
TimeForAnswer: question.TimeForAnswer,
},
},
}
}
Expand Down
66 changes: 36 additions & 30 deletions internal/service/workers/cleanquestiondeadlines/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,52 +7,58 @@ import (

"github.com/go-co-op/gocron/v2"
"github.com/rarimo/geo-points-svc/internal/config"
"github.com/rarimo/geo-points-svc/internal/data"
"github.com/rarimo/geo-points-svc/internal/data/pg"
"github.com/rarimo/geo-points-svc/internal/service/workers/cron"
)

func Run(ctx context.Context, cfg config.Config, sig chan struct{}) {
cron.Init(cfg.Log())
log := cfg.Log().WithField("who", "daily-questions-cleaner")

eventsQ := pg.NewEvents(cfg.DB().Clone())
questionsQ := pg.NewDailyQuestionsQ(cfg.DB().Clone())

offset := cfg.DailyQuestions().LocalTime(atDayStart(time.Now().UTC())).Hour()
_, err := cron.NewJob(
gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(uint(offset), 0, 0))),
gocron.NewTask(func() {
cfg.DailyQuestions()
counts := cfg.DailyQuestions().ClearDeadlines()
if len(counts) == 0 {
log.Infof("Questions absent")
return
}

err := eventsQ.New().Transaction(func() error {
for k := range counts {
count, err := eventsQ.New().FilterByQuestionID(k).Count()
if err != nil {
return fmt.Errorf("failed to get count events by question id: %w", err)
}

err = questionsQ.FilterByID(int64(k)).Update(map[string]any{
data.ColCorrectAnswers: count,
data.ColAllParticipants: counts[k],
})
if err != nil {
return fmt.Errorf("failed to update daily question: %w", err)
}
log.WithField("question_id", k).Infof("Correct answers: %d; Total participants: %d", count, counts[k])
}
return nil
})
if err != nil {
log.WithError(err).Error("Failed to correct update question statistic")
}
}),
gocron.WithName("daily-questions-cleaner"),
)
if err != nil {
panic(fmt.Errorf(": failed to initialize daily job: %w", err))
panic(fmt.Errorf("failed to initialize daily job: %w", err))
}
sig <- struct{}{}

for {
now := time.Now().UTC().Add(time.Duration(offset) * time.Hour)
cfg.Log().Info("Daily Question cleaning start")
nextMidnight := time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, time.UTC).
Add(time.Duration(offset) * time.Hour)

durationUntilNextTick := nextMidnight.Sub(now)

timer := time.NewTimer(durationUntilNextTick)

select {
case <-timer.C:
res := cfg.DailyQuestions().ClearDeadlines()
cfg.Log().Infof("Cleared daily questions quantity: %v", res)

timer.Stop()

case <-sig:
cfg.Log().Info("Daily Question cleaning stop")
timer.Stop()
return

case <-ctx.Done():
cfg.Log().Info("Daily Question cleaning stop")
timer.Stop()
return
}
}
cron.Start(ctx)
}

func atDayStart(date time.Time) time.Time {
Expand Down

0 comments on commit 8c6380c

Please sign in to comment.