Skip to content

Commit

Permalink
Replace gRPC with TCP
Browse files Browse the repository at this point in the history
Didn't need to be that complicated
  • Loading branch information
Ian McIntyre committed Feb 13, 2018
1 parent cf93ec7 commit 6139894
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 343 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Craas

Card reading as a service. Craas presents a gRPC service, `CardReader`, that publishes card read events to gRPC subscribers.
Card reading as a service. Craas presents a TCP server that publishes card read events to TCP subscribers.

- [X] client debugging. `craas -testing` sets up a REPL so that you can publish arbitrary card events from the command line. Useful for testing your subscriber implementation
- [ ] read card events from a serial device
- [X] client debugging. `craas -testing` sets up a REPL so that you can publish arbitrary card events from the command line. Useful for testing your subscriber implementation without having a card reader.
- [ ] read card events from a serial device.

## Usage

Expand Down
77 changes: 37 additions & 40 deletions craas.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
/*Package craas implements a card reader gRPC service.*/
/*Package craas implements a card reader TCP service.*/
package main

import (
"bytes"
"flag"
"fmt"
"io"
Expand All @@ -10,9 +11,6 @@ import (
"os"
"sync"
"time"

pb "github.com/IQ-Inc/craas/craasbuf"
"google.golang.org/grpc"
)

var (
Expand Down Expand Up @@ -43,71 +41,66 @@ func validateFlags() bool {

// cardAuth hodls the reader of card events.
// It pushes reader messages to the subscribers
type cardAuth struct {
type publisher struct {
sync.Mutex // lock em down
rdr io.Reader // source of card events
subscribers []chan []byte // things that are interested in the card events
}

func (ca *cardAuth) GetCardEvents(_ *pb.CardRequest, stream pb.CardReader_GetCardEventsServer) error {
func (pub *publisher) handle(conn io.WriteCloser) {
c := make(chan []byte)
idx := -1
defer conn.Close()

// Add the new client channel into
// the cardAuth struct
func() {
ca.Lock()
defer ca.Unlock()

idx = len(ca.subscribers)
ca.subscribers = append(ca.subscribers, c)
pub.Lock()
defer pub.Unlock()
pub.subscribers = append(pub.subscribers, c)
}()

// Listen for messages on the channel
for msg := range c {
resp := &pb.CardResponse{
Card: &pb.Card{
Id: string(msg),
},
for {
// Listen for messages on the channel
msg, closed := <-c
if closed {
return
}

if err := stream.Send(resp); err != nil {
log.Println("client send error:", err)
return err
}
buf := bytes.NewBuffer(msg)
io.Copy(conn, buf)
}

return nil
}

// publish moves buffers from the reader into the
// subscriber's channels
func publish(ca *cardAuth) {
func (pub *publisher) publish() {
bs := [256]byte{}
for {
n, err := ca.rdr.Read(bs[:])
n, err := pub.rdr.Read(bs[:])

if err != nil {
log.Println("Error on reader:", err)
log.Println("The service is shutting down")

func() {
ca.Lock()
defer ca.Unlock()
for _, sub := range ca.subscribers {
pub.Lock()
defer pub.Unlock()
for _, sub := range pub.subscribers {
close(sub)
}
}()
return

os.Exit(1)
}

func() {
ca.Lock()
defer ca.Unlock()
pub.Lock()
defer pub.Unlock()

removals := make([]int, 0)

for idx, sub := range ca.subscribers {
for idx, sub := range pub.subscribers {
select {
case sub <- bs[:n]:
continue
Expand All @@ -119,9 +112,9 @@ func publish(ca *cardAuth) {
}

for removal := range removals {
sub := ca.subscribers[removal]
sub := pub.subscribers[removal]
close(sub)
ca.subscribers = append(ca.subscribers[:removal], ca.subscribers[removal+1:]...)
pub.subscribers = append(pub.subscribers[:removal], pub.subscribers[removal+1:]...)
}

}()
Expand All @@ -138,17 +131,21 @@ func main() {
log.Fatalln(err)
}

log.Println("gRPC service started at", *flagport)
log.Println("TCP service started at", *flagport)

var rdr io.Reader
if *flagtest {
rdr = newRepl(">>")
}

grpcServer := grpc.NewServer()
ca := &cardAuth{rdr: rdr, subscribers: []chan []byte{}}
go publish(ca)
pub := &publisher{rdr: rdr, subscribers: []chan []byte{}}
go pub.publish()

pb.RegisterCardReaderServer(grpcServer, ca)
grpcServer.Serve(lis)
for {
conn, err := lis.Accept()
if err != nil {
log.Fatalln("error accepting connection:", err)
go pub.handle(conn)
}
}
}
219 changes: 0 additions & 219 deletions craasbuf/cardservice.pb.go

This file was deleted.

Loading

0 comments on commit 6139894

Please sign in to comment.