Skip to content

Commit

Permalink
Update message types for draft-05
Browse files Browse the repository at this point in the history
  • Loading branch information
mengelbart committed Jul 15, 2024
1 parent 85053ac commit 0c18f3a
Show file tree
Hide file tree
Showing 28 changed files with 474 additions and 355 deletions.
2 changes: 1 addition & 1 deletion examples/chat/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (c *Client) Run() error {
err := c.rm.rooms[fields[1]].lt.WriteObject(context.Background(), moqtransport.Object{
GroupID: 0,
ObjectID: 0,
ObjectSendOrder: 0,
PublisherPriority: 0,
ForwardingPreference: moqtransport.ObjectForwardingPreferenceStream,
Payload: []byte(strings.TrimSpace(msg)),
})
Expand Down
2 changes: 1 addition & 1 deletion examples/chat/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (r *room) announceUser(username string, s *moqtransport.Session, arw moqtra
r.catalogTrack.WriteObject(context.Background(), moqtransport.Object{
GroupID: r.catalogGroup,
ObjectID: 0,
ObjectSendOrder: 0,
PublisherPriority: 0,
ForwardingPreference: moqtransport.ObjectForwardingPreferenceStreamTrack,
Payload: []byte(catalog),
})
Expand Down
2 changes: 1 addition & 1 deletion examples/date/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (h *moqHandler) setupDateTrack(ctx context.Context) {
h.localTrack.WriteObject(ctx, moqtransport.Object{
GroupID: id,
ObjectID: 0,
ObjectSendOrder: 0,
PublisherPriority: 0,
ForwardingPreference: moqtransport.ObjectForwardingPreferenceStream,
Payload: []byte(fmt.Sprintf("%v", ts)),
})
Expand Down
10 changes: 5 additions & 5 deletions group_header_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ type groupHeaderStream struct {
stream SendStream
}

func newGroupHeaderStream(stream SendStream, subscribeID, trackAlias, groupID, objectSendOrder uint64) (*groupHeaderStream, error) {
func newGroupHeaderStream(stream SendStream, subscribeID, trackAlias, groupID uint64, publisherPriority uint8) (*groupHeaderStream, error) {
shgm := &wire.StreamHeaderGroupMessage{
SubscribeID: subscribeID,
TrackAlias: trackAlias,
GroupID: groupID,
ObjectSendOrder: objectSendOrder,
SubscribeID: subscribeID,
TrackAlias: trackAlias,
GroupID: groupID,
PublisherPriority: publisherPriority,
}
buf := make([]byte, 0, 40)
buf = shgm.Append(buf)
Expand Down
8 changes: 4 additions & 4 deletions integrationtests/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func TestIntegration(t *testing.T) {
err = track.WriteObject(ctx, moqtransport.Object{
GroupID: 0,
ObjectID: 0,
ObjectSendOrder: 0,
PublisherPriority: 0,
ForwardingPreference: 0,
Payload: []byte("hello world"),
})
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestIntegration(t *testing.T) {
err = track.WriteObject(ctx, moqtransport.Object{
GroupID: 0,
ObjectID: 0,
ObjectSendOrder: 0,
PublisherPriority: 0,
ForwardingPreference: 0,
Payload: []byte("hello world"),
})
Expand All @@ -262,7 +262,7 @@ func TestIntegration(t *testing.T) {
err = track.WriteObject(ctx, moqtransport.Object{
GroupID: 0,
ObjectID: 0,
ObjectSendOrder: 0,
PublisherPriority: 0,
ForwardingPreference: 0,
Payload: []byte("hello world"),
})
Expand All @@ -272,7 +272,7 @@ func TestIntegration(t *testing.T) {
err = track.WriteObject(ctx, moqtransport.Object{
GroupID: 0,
ObjectID: 0,
ObjectSendOrder: 0,
PublisherPriority: 0,
ForwardingPreference: 0,
Payload: []byte("hello world"),
})
Expand Down
1 change: 1 addition & 0 deletions internal/wire/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ var (
errInvalidFilterType = errors.New("invalid filter type")
errDuplicateParameter = errors.New("duplicated parameter")
errInvalidContentExistsByte = errors.New("invalid use of ContentExists byte")
errInvalidGroupOrder = errors.New("invalid GroupOrder")
)
30 changes: 16 additions & 14 deletions internal/wire/object_message.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package wire

import (
"io"

"github.com/quic-go/quic-go/quicvarint"
)

Expand All @@ -15,14 +17,14 @@ const (
)

type ObjectMessage struct {
Type ObjectMessageType
SubscribeID uint64
TrackAlias uint64
GroupID uint64
ObjectID uint64
ObjectSendOrder uint64
ObjectStatus ObjectStatus
ObjectPayload []byte
Type ObjectMessageType
SubscribeID uint64
TrackAlias uint64
GroupID uint64
ObjectID uint64
PublisherPriority uint8
ObjectStatus ObjectStatus
ObjectPayload []byte
}

func (m *ObjectMessage) Append(buf []byte) []byte {
Expand All @@ -35,7 +37,7 @@ func (m *ObjectMessage) Append(buf []byte) []byte {
buf = quicvarint.Append(buf, m.TrackAlias)
buf = quicvarint.Append(buf, m.GroupID)
buf = quicvarint.Append(buf, m.ObjectID)
buf = quicvarint.Append(buf, m.ObjectSendOrder)
buf = append(buf, m.PublisherPriority)
buf = quicvarint.Append(buf, uint64(m.ObjectStatus))
buf = append(buf, m.ObjectPayload...)
return buf
Expand Down Expand Up @@ -67,12 +69,12 @@ func (m *ObjectMessage) parse(data []byte) (parsed int, err error) {
return
}
data = data[n:]
m.ObjectSendOrder, n, err = quicvarint.Parse(data)
parsed += n
if err != nil {
return
if len(data) == 0 {
return parsed, io.EOF
}
data = data[n:]
m.PublisherPriority = data[0]
parsed += 1
data = data[1:]
var status uint64
status, n, err = quicvarint.Parse(data)
parsed += n
Expand Down
54 changes: 27 additions & 27 deletions internal/wire/object_message_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,21 @@ type ObjectStreamParser struct {
gotHeader bool
streamType ObjectMessageType

subscribeID uint64
trackAlias uint64
objectSendOrder uint64
groupID uint64
subscribeID uint64
trackAlias uint64
publisherPriority uint8
groupID uint64
}

func NewObjectStreamParser(r io.Reader) *ObjectStreamParser {
return &ObjectStreamParser{
reader: bufio.NewReader(r),
gotHeader: false,
streamType: 0,
subscribeID: 0,
trackAlias: 0,
objectSendOrder: 0,
groupID: 0,
reader: bufio.NewReader(r),
gotHeader: false,
streamType: 0,
subscribeID: 0,
trackAlias: 0,
publisherPriority: 0,
groupID: 0,
}
}

Expand All @@ -46,15 +46,15 @@ func (p *ObjectStreamParser) Parse() (*ObjectMessage, error) {
}
p.subscribeID = shtm.SubscribeID
p.trackAlias = shtm.TrackAlias
p.objectSendOrder = shtm.ObjectSendOrder
p.publisherPriority = shtm.PublisherPriority
case StreamHeaderGroupMessageType:
shgm := &StreamHeaderGroupMessage{}
if err := shgm.parse(p.reader); err != nil {
return nil, err
}
p.subscribeID = shgm.SubscribeID
p.trackAlias = shgm.TrackAlias
p.objectSendOrder = shgm.ObjectSendOrder
p.publisherPriority = shgm.PublisherPriority
p.groupID = shgm.GroupID
}
}
Expand Down Expand Up @@ -92,13 +92,13 @@ func (p *ObjectStreamParser) Parse() (*ObjectMessage, error) {
return nil, err
}
return &ObjectMessage{
Type: StreamHeaderTrackMessageType,
SubscribeID: p.subscribeID,
TrackAlias: p.trackAlias,
GroupID: om.GroupID,
ObjectID: om.ObjectID,
ObjectSendOrder: p.objectSendOrder,
ObjectPayload: om.ObjectPayload,
Type: StreamHeaderTrackMessageType,
SubscribeID: p.subscribeID,
TrackAlias: p.trackAlias,
GroupID: om.GroupID,
ObjectID: om.ObjectID,
PublisherPriority: p.publisherPriority,
ObjectPayload: om.ObjectPayload,
}, nil

case StreamHeaderGroupMessageType:
Expand All @@ -107,13 +107,13 @@ func (p *ObjectStreamParser) Parse() (*ObjectMessage, error) {
return nil, err
}
return &ObjectMessage{
Type: StreamHeaderGroupMessageType,
SubscribeID: p.subscribeID,
TrackAlias: p.trackAlias,
GroupID: p.groupID,
ObjectID: om.ObjectID,
ObjectSendOrder: p.objectSendOrder,
ObjectPayload: om.ObjectPayload,
Type: StreamHeaderGroupMessageType,
SubscribeID: p.subscribeID,
TrackAlias: p.trackAlias,
GroupID: p.groupID,
ObjectID: om.ObjectID,
PublisherPriority: p.publisherPriority,
ObjectPayload: om.ObjectPayload,
}, nil
}
return nil, errInvalidMessageType
Expand Down
64 changes: 32 additions & 32 deletions internal/wire/object_message_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,26 @@ func TestObjectStreamParser(t *testing.T) {
mr: &mockReader{
reads: [][]byte{
(&ObjectMessage{
Type: 0,
SubscribeID: 0,
TrackAlias: 0,
GroupID: 0,
ObjectID: 0,
ObjectSendOrder: 0,
ObjectPayload: []byte{},
Type: 0,
SubscribeID: 0,
TrackAlias: 0,
GroupID: 0,
ObjectID: 0,
PublisherPriority: 0,
ObjectPayload: []byte{},
}).Append([]byte{}),
},
index: 0,
},
expect: []*ObjectMessage{
{
Type: 0,
SubscribeID: 0,
TrackAlias: 0,
GroupID: 0,
ObjectID: 0,
ObjectSendOrder: 0,
ObjectPayload: []byte{},
Type: 0,
SubscribeID: 0,
TrackAlias: 0,
GroupID: 0,
ObjectID: 0,
PublisherPriority: 0,
ObjectPayload: []byte{},
},
},
err: io.EOF,
Expand All @@ -54,10 +54,10 @@ func TestObjectStreamParser(t *testing.T) {
mr: &mockReader{
reads: [][]byte{
(&StreamHeaderGroupMessage{
SubscribeID: 0,
TrackAlias: 0,
GroupID: 0,
ObjectSendOrder: 0,
SubscribeID: 0,
TrackAlias: 0,
GroupID: 0,
PublisherPriority: 0,
}).Append([]byte{}),
(&StreamHeaderGroupObject{
ObjectID: 0,
Expand All @@ -72,22 +72,22 @@ func TestObjectStreamParser(t *testing.T) {
},
expect: []*ObjectMessage{
{
Type: StreamHeaderGroupMessageType,
SubscribeID: 0,
TrackAlias: 0,
GroupID: 0,
ObjectID: 0,
ObjectSendOrder: 0,
ObjectPayload: []byte{0x00},
Type: StreamHeaderGroupMessageType,
SubscribeID: 0,
TrackAlias: 0,
GroupID: 0,
ObjectID: 0,
PublisherPriority: 0,
ObjectPayload: []byte{0x00},
},
{
Type: StreamHeaderGroupMessageType,
SubscribeID: 0,
TrackAlias: 0,
GroupID: 0,
ObjectID: 1,
ObjectSendOrder: 0,
ObjectPayload: []byte{0x01},
Type: StreamHeaderGroupMessageType,
SubscribeID: 0,
TrackAlias: 0,
GroupID: 0,
ObjectID: 1,
PublisherPriority: 0,
ObjectPayload: []byte{0x01},
},
},
err: io.EOF,
Expand Down
Loading

0 comments on commit 0c18f3a

Please sign in to comment.