Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add namespace to synchronizer messages too #6

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions adapters/incluster/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (c *Client) Start(ctx context.Context) error {
}
newObject, err := d.MarshalJSON()
if err != nil {
logger.L().Error("cannot marshal object", helpers.Error(err), helpers.String("resource", c.res.Resource), helpers.String("key", id.String()))
logger.L().Error("cannot marshal object", helpers.Error(err), helpers.String("resource", c.res.Resource), helpers.String("id", id.String()))
continue
}
switch {
Expand Down Expand Up @@ -217,7 +217,7 @@ func (c *Client) GetObject(ctx context.Context, id domain.KindName, baseObject [
func (c *Client) PatchObject(ctx context.Context, id domain.KindName, checksum string, patch []byte) error {
baseObject, err := c.patchObject(ctx, id, checksum, patch)
if err != nil {
logger.L().Warning("patch object, sending get object", helpers.Error(err))
logger.L().Warning("patch object, sending get object", helpers.Error(err), helpers.String("id", id.String()))
return c.callbacks.GetObject(ctx, id, baseObject)
}
return nil
Expand Down Expand Up @@ -278,7 +278,7 @@ func (c *Client) Callbacks(_ context.Context) (domain.Callbacks, error) {
func (c *Client) VerifyObject(ctx context.Context, id domain.KindName, newChecksum string) error {
baseObject, err := c.verifyObject(id, newChecksum)
if err != nil {
logger.L().Warning("verify object, sending get object", helpers.Error(err))
logger.L().Warning("verify object, sending get object", helpers.Error(err), helpers.String("id", id.String()))
return c.callbacks.GetObject(ctx, id, baseObject)
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions adapters/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (m *MockAdapter) GetObject(ctx context.Context, id domain.KindName, baseObj
func (m *MockAdapter) PatchObject(ctx context.Context, id domain.KindName, checksum string, patch []byte) error {
baseObject, err := m.patchObject(id, checksum, patch)
if err != nil {
logger.L().Warning("patch object, sending get object", helpers.Error(err))
logger.L().Warning("patch object, sending get object", helpers.Error(err), helpers.String("id", id.String()))
return m.callbacks.GetObject(ctx, id, baseObject)
}
return nil
Expand Down Expand Up @@ -112,7 +112,7 @@ func (m *MockAdapter) Start(_ context.Context) error {
func (m *MockAdapter) VerifyObject(ctx context.Context, id domain.KindName, newChecksum string) error {
baseObject, err := m.verifyObject(id, newChecksum)
if err != nil {
logger.L().Warning("verify object, sending get object", helpers.Error(err))
logger.L().Warning("verify object, sending get object", helpers.Error(err), helpers.String("id", id.String()))
return m.callbacks.GetObject(ctx, id, baseObject)
}
return nil
Expand Down
17 changes: 17 additions & 0 deletions api/asyncapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ components:
$ref: '#/components/schemas/msgID'
name:
$ref: '#/components/schemas/name'
namespace:
$ref: '#/components/schemas/namespace'
objectAdded:
type: object
properties:
Expand All @@ -113,6 +115,8 @@ components:
$ref: '#/components/schemas/msgID'
name:
$ref: '#/components/schemas/name'
namespace:
$ref: '#/components/schemas/namespace'
object:
$ref: '#/components/schemas/object'
objectDeleted:
Expand All @@ -128,6 +132,8 @@ components:
$ref: '#/components/schemas/msgID'
name:
$ref: '#/components/schemas/name'
namespace:
$ref: '#/components/schemas/namespace'
objectModified:
type: object
properties:
Expand All @@ -143,6 +149,8 @@ components:
$ref: '#/components/schemas/msgID'
name:
$ref: '#/components/schemas/name'
namespace:
$ref: '#/components/schemas/namespace'
object:
$ref: '#/components/schemas/object'
getObject:
Expand All @@ -160,6 +168,8 @@ components:
$ref: '#/components/schemas/msgID'
name:
$ref: '#/components/schemas/name'
namespace:
$ref: '#/components/schemas/namespace'
patchObject:
type: object
properties:
Expand All @@ -175,6 +185,8 @@ components:
$ref: '#/components/schemas/msgID'
name:
$ref: '#/components/schemas/name'
namespace:
$ref: '#/components/schemas/namespace'
patch:
$ref: '#/components/schemas/object'
putObject:
Expand All @@ -190,6 +202,8 @@ components:
$ref: '#/components/schemas/msgID'
name:
$ref: '#/components/schemas/name'
namespace:
$ref: '#/components/schemas/namespace'
object:
$ref: '#/components/schemas/object'
depth:
Expand Down Expand Up @@ -222,6 +236,9 @@ components:
name:
type: string
description: name of the object
namespace:
type: string
description: namespace of the object
object:
type: string
description: The object is encoded in JSON
Expand Down
98 changes: 59 additions & 39 deletions core/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,15 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error {
continue
}
id := domain.KindName{
Kind: msg.Kind,
Name: msg.Name,
Kind: msg.Kind,
Name: msg.Name,
Namespace: msg.Namespace,
}
err := s.handleSyncGetObject(ctx, id, []byte(msg.BaseObject))
if err != nil {
logger.L().Error("error handling message", helpers.Error(err),
helpers.Interface("event", generic.Event.Value()))
helpers.Interface("event", msg.Event.Value()),
helpers.String("id", id.String()))
continue
}
case domain.EventNewChecksum:
Expand All @@ -176,13 +178,15 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error {
continue
}
id := domain.KindName{
Kind: msg.Kind,
Name: msg.Name,
Kind: msg.Kind,
Name: msg.Name,
Namespace: msg.Namespace,
}
err := s.handleSyncNewChecksum(ctx, id, msg.Checksum)
if err != nil {
logger.L().Error("error handling message", helpers.Error(err),
helpers.Interface("event", generic.Event.Value()))
helpers.Interface("event", msg.Event.Value()),
helpers.String("id", id.String()))
continue
}
case domain.EventObjectDeleted:
Expand All @@ -194,13 +198,15 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error {
continue
}
id := domain.KindName{
Kind: msg.Kind,
Name: msg.Name,
Kind: msg.Kind,
Name: msg.Name,
Namespace: msg.Namespace,
}
err := s.handleSyncObjectDeleted(ctx, id)
if err != nil {
logger.L().Error("error handling message", helpers.Error(err),
helpers.Interface("event", generic.Event.Value()))
helpers.Interface("event", msg.Event.Value()),
helpers.String("id", id.String()))
continue
}
case domain.EventPatchObject:
Expand All @@ -212,13 +218,15 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error {
continue
}
id := domain.KindName{
Kind: msg.Kind,
Name: msg.Name,
Kind: msg.Kind,
Name: msg.Name,
Namespace: msg.Namespace,
}
err := s.handleSyncPatchObject(ctx, id, msg.Checksum, []byte(msg.Patch))
if err != nil {
logger.L().Error("error handling message", helpers.Error(err),
helpers.Interface("event", generic.Event.Value()))
helpers.Interface("event", msg.Event.Value()),
helpers.String("id", id.String()))
continue
}
case domain.EventPutObject:
Expand All @@ -230,13 +238,15 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error {
continue
}
id := domain.KindName{
Kind: msg.Kind,
Name: msg.Name,
Kind: msg.Kind,
Name: msg.Name,
Namespace: msg.Namespace,
}
err := s.handleSyncPutObject(ctx, id, []byte(msg.Object))
if err != nil {
logger.L().Error("error handling message", helpers.Error(err),
helpers.Interface("event", generic.Event.Value()))
helpers.Interface("event", msg.Event.Value()),
helpers.String("id", id.String()))
continue
}
}
Expand Down Expand Up @@ -294,6 +304,7 @@ func (s *Synchronizer) sendGetObject(ctx context.Context, id domain.KindName, ba
Kind: id.Kind,
MsgId: msgId,
Name: id.Name,
Namespace: id.Namespace,
}
data, err := json.Marshal(msg)
if err != nil {
Expand All @@ -308,6 +319,7 @@ func (s *Synchronizer) sendGetObject(ctx context.Context, id domain.KindName, ba
helpers.String("account", clientId.Account),
helpers.String("cluster", clientId.Cluster),
helpers.String("kind", msg.Kind.String()),
helpers.String("namespace", msg.Namespace),
helpers.String("name", msg.Name),
helpers.Int("base object size", len(msg.BaseObject)))
return nil
Expand All @@ -318,12 +330,13 @@ func (s *Synchronizer) sendNewChecksum(ctx context.Context, id domain.KindName,
depth := ctx.Value(domain.ContextKeyDepth).(int)
msgId := ctx.Value(domain.ContextKeyMsgId).(string)
msg := domain.NewChecksum{
Checksum: checksum,
Depth: depth + 1,
Event: &event,
Kind: id.Kind,
MsgId: msgId,
Name: id.Name,
Checksum: checksum,
Depth: depth + 1,
Event: &event,
Kind: id.Kind,
MsgId: msgId,
Name: id.Name,
Namespace: id.Namespace,
}
data, err := json.Marshal(msg)
if err != nil {
Expand All @@ -341,6 +354,7 @@ func (s *Synchronizer) sendNewChecksum(ctx context.Context, id domain.KindName,
helpers.String("account", clientId.Account),
helpers.String("cluster", clientId.Cluster),
helpers.String("kind", msg.Kind.String()),
helpers.String("namespace", msg.Namespace),
helpers.String("name", msg.Name),
helpers.String("checksum", msg.Checksum))
return nil
Expand All @@ -351,11 +365,12 @@ func (s *Synchronizer) sendObjectDeleted(ctx context.Context, id domain.KindName
depth := ctx.Value(domain.ContextKeyDepth).(int)
msgId := ctx.Value(domain.ContextKeyMsgId).(string)
msg := domain.ObjectDeleted{
Depth: depth + 1,
Event: &event,
Kind: id.Kind,
MsgId: msgId,
Name: id.Name,
Depth: depth + 1,
Event: &event,
Kind: id.Kind,
MsgId: msgId,
Name: id.Name,
Namespace: id.Namespace,
}
data, err := json.Marshal(msg)
if err != nil {
Expand All @@ -370,6 +385,7 @@ func (s *Synchronizer) sendObjectDeleted(ctx context.Context, id domain.KindName
helpers.String("account", clientId.Account),
helpers.String("cluster", clientId.Cluster),
helpers.String("kind", msg.Kind.String()),
helpers.String("namespace", msg.Namespace),
helpers.String("name", msg.Name))
return nil
}
Expand All @@ -380,13 +396,14 @@ func (s *Synchronizer) sendPatchObject(ctx context.Context, id domain.KindName,
msgId := ctx.Value(domain.ContextKeyMsgId).(string)

msg := domain.PatchObject{
Checksum: checksum,
Depth: depth + 1,
Event: &event,
Kind: id.Kind,
MsgId: msgId,
Name: id.Name,
Patch: string(patch),
Checksum: checksum,
Depth: depth + 1,
Event: &event,
Kind: id.Kind,
MsgId: msgId,
Name: id.Name,
Namespace: id.Namespace,
Patch: string(patch),
}
data, err := json.Marshal(msg)
if err != nil {
Expand All @@ -403,6 +420,7 @@ func (s *Synchronizer) sendPatchObject(ctx context.Context, id domain.KindName,
helpers.String("account", clientId.Account),
helpers.String("cluster", clientId.Cluster),
helpers.String("kind", msg.Kind.String()),
helpers.String("namespace", msg.Namespace),
helpers.String("name", msg.Name),
helpers.String("checksum", msg.Checksum),
helpers.Int("patch size", len(msg.Patch)))
Expand All @@ -414,12 +432,13 @@ func (s *Synchronizer) sendPutObject(ctx context.Context, id domain.KindName, ob
depth := ctx.Value(domain.ContextKeyDepth).(int)
msgId := ctx.Value(domain.ContextKeyMsgId).(string)
msg := domain.PutObject{
Depth: depth + 1,
Event: &event,
Kind: id.Kind,
MsgId: msgId,
Name: id.Name,
Object: string(object),
Depth: depth + 1,
Event: &event,
Kind: id.Kind,
MsgId: msgId,
Name: id.Name,
Namespace: id.Namespace,
Object: string(object),
}
data, err := json.Marshal(msg)
if err != nil {
Expand All @@ -435,6 +454,7 @@ func (s *Synchronizer) sendPutObject(ctx context.Context, id domain.KindName, ob
helpers.String("account", clientId.Account),
helpers.String("cluster", clientId.Cluster),
helpers.String("kind", msg.Kind.String()),
helpers.String("namespace", msg.Namespace),
helpers.String("name", msg.Name),
helpers.Int("object size", len(msg.Object)))
return nil
Expand Down
1 change: 1 addition & 0 deletions domain/GetObject.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ type GetObject struct {
Kind *Kind
MsgId string
Name string
Namespace string
AdditionalProperties map[string]interface{}
}
1 change: 1 addition & 0 deletions domain/NewChecksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ type NewChecksum struct {
Kind *Kind
MsgId string
Name string
Namespace string
AdditionalProperties map[string]interface{}
}
1 change: 1 addition & 0 deletions domain/ObjectAdded.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type ObjectAdded struct {
Kind *Kind
MsgId string
Name string
Namespace string
Object string
AdditionalProperties map[string]interface{}
}
1 change: 1 addition & 0 deletions domain/ObjectDeleted.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ type ObjectDeleted struct {
Kind *Kind
MsgId string
Name string
Namespace string
AdditionalProperties map[string]interface{}
}
1 change: 1 addition & 0 deletions domain/ObjectModified.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type ObjectModified struct {
Kind *Kind
MsgId string
Name string
Namespace string
Object string
AdditionalProperties map[string]interface{}
}
1 change: 1 addition & 0 deletions domain/PatchObject.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type PatchObject struct {
Kind *Kind
MsgId string
Name string
Namespace string
Patch string
AdditionalProperties map[string]interface{}
}
1 change: 1 addition & 0 deletions domain/PutObject.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ type PutObject struct {
Kind *Kind
MsgId string
Name string
Namespace string
Object string
AdditionalProperties map[string]interface{}
}
Loading
Loading