package main import ( "context" "flag" "log" "math/rand" "net/http" "sync" // "github.com/improbable-eng/grpc-web/go/grpcweb" _ "net/http/pprof" pb "git.capella.pro/thepaint/protos" "google.golang.org/protobuf/proto" "encoding/base64" "encoding/json" "github.com/gorilla/websocket" "github.com/pion/webrtc/v3" "github.com/sirupsen/logrus" ) var ( tls = flag.Bool("tls", false, "Connection uses TLS if true, else plain TCP") certFile = flag.String("cert_file", "", "The TLS cert file") keyFile = flag.String("key_file", "", "The TLS key file") jsonDBFile = flag.String("json_db_file", "", "A json file containing a list of features") port = flag.Int("port", 8080, "The server port") ) var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } type server struct { pb.UnimplementedPaintClientServer // savedFeatures []*pb.Feature // read-only after initialized // // mu sync.Mutex // protects routeNotes // routeNotes map[string][]*pb.RouteNote } func main() { http.HandleFunc("/ws", websocketHandler) logrus.Fatal(http.ListenAndServe(":8081", nil)) } func (s *server) Paint(ctx context.Context, request *pb.PaintRequest) (*pb.PaintReply, error) { panic("implement me") } func (s *server) Monitor(sub *pb.MonitorSub, monitorServer pb.PaintClient_MonitorServer) error { // for k := 0; k < 4; k++ { // points := []*pb.MonitorPoint{} // for i := 64*k; i < 64*(k+1); i++ { // for j := 0; j < 256; j++ { // points = append(points, &pb.MonitorPoint{ // Point: &pb.BPoint{ // X: int32(i), // Y: int32(j), // }, // Color: &pb.BColor{ // Rgba: uint32(rand.Intn(256)) | uint32(rand.Intn(256))<<8 | uint32(rand.Intn(256))<<16 | 255<<24, // }, // }) // } // } // // monitorServer.Send(&pb.MonitorReply{ // Points: points, // // BytesPoints: b.Bytes(), // }) // } points := Getp(0, 0) monitorServer.Send(&pb.MonitorReply{ Points: points, // BytesPoints: b.Bytes(), }) return nil } func Getp(ii int, jj int) []*pb.MonitorPoint { points := []*pb.MonitorPoint{} for i := ii * 32; i < 32*(1+ii); i++ { for j := jj * 32; j < 32*(1+jj); j++ { points = append(points, &pb.MonitorPoint{ Point: &pb.BPoint{ X: int32(i), Y: int32(j), }, Color: &pb.BColor{ Rgba: uint32(rand.Intn(256)) | uint32(rand.Intn(256))<<8 | uint32(rand.Intn(256))<<16 | 255<<24, }, }) } } return points } func Encode(obj interface{}) string { b, err := json.Marshal(obj) if err != nil { panic(err) } return base64.StdEncoding.EncodeToString(b) } // Handle incoming websockets func websocketHandler(w http.ResponseWriter, r *http.Request) { // Upgrade HTTP request to Websocket unsafeConn, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Print("upgrade:", err) return } c := &threadSafeWriter{unsafeConn, sync.Mutex{}} logrus.Info("NEW GRPC") // Configure and create a new PeerConnection. config := webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ { URLs: []string{"stun:melvans.com:3478"}, Username: "b84e4232f84505e3c3967184df374f2cdaa954f1", Credential: "afc39ae0591c132292f73a87f6f3725311fbafa0", // URLs: []string{"stun:stun.l.google.com:19302"}, }, }, } pc, err := webrtc.NewPeerConnection(config) if err != nil { logrus.Error(err) } // Create DataChannel. sendChannel, err := pc.CreateDataChannel("data", nil) if err != nil { logrus.Error(err) } sendChannel.OnClose(func() { logrus.Println("sendChannel has closed") }) sendChannel.OnOpen(func() { logrus.Println("sendChannel has opened") for i := 0; i < 8; i++ { for j := 0; j < 8; j++ { data := &pb.MonitorReply{ Points: Getp(i, j), } md, err := proto.Marshal(data) if err != nil { logrus.Error(err) continue } err = sendChannel.Send(md) if err != nil { logrus.Error(err) continue } } } }) sendChannel.OnMessage(func(msg webrtc.DataChannelMessage) { logrus.Println(string(msg.Data)) }) // Create offer offer, err := pc.CreateOffer(nil) if err != nil { logrus.Error(err) } if err := pc.SetLocalDescription(offer); err != nil { logrus.Error(err) } // Add handlers for setting up the connection. pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { logrus.Println(state) }) // connectionString := make(chan bool) // Trickle ICE. Emit server candidate to client pc.OnICECandidate(func(i *webrtc.ICECandidate) { if i == nil { return } data := &pb.RTCData{ Event: string(pc.LocalDescription().Type.String()), Data: string(pc.LocalDescription().SDP), } logrus.WithField("data", data).Info("data to client") md, _ := proto.Marshal(data) w, _ := unsafeConn.NextWriter(websocket.BinaryMessage) w.Write(md) w.Close() }) for { _, raw, err := c.ReadMessage() data := &pb.RTCData{} if err != nil { logrus.Error(err) return } else if err := proto.Unmarshal(raw, data); err != nil { logrus.WithField("data", string(raw)).Error(err) return } logrus.WithField("data", data).Info("data from client") switch data.Event { case "candidate": candidate := webrtc.ICECandidateInit{} if err := json.Unmarshal([]byte(data.Data), &candidate); err != nil { logrus.Error(err) return } if err := pc.AddICECandidate(candidate); err != nil { logrus.Error(err) return } case "answer": answer := webrtc.SessionDescription{ Type: webrtc.NewSDPType(data.Event), SDP: data.Data, } if err := pc.SetRemoteDescription(answer); err != nil { logrus.Error(err) return } } } } type threadSafeWriter struct { *websocket.Conn sync.Mutex } func (t *threadSafeWriter) WriteJSON(v interface{}) error { t.Lock() defer t.Unlock() return t.Conn.WriteJSON(v) }