package main import ( "log" "math/rand" "net/http" "sync" // "github.com/improbable-eng/grpc-web/go/grpcweb" _ "net/http/pprof" pb "git.capella.pro/thepaint/protos" "google.golang.org/protobuf/proto" "encoding/base64" "encoding/json" "github.com/gorilla/websocket" "github.com/pion/webrtc/v3" "github.com/sirupsen/logrus" ) var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } func main() { http.HandleFunc("/ws", websocketHandler) logrus.Fatal(http.ListenAndServe(":8081", nil)) } func Getp(ii int, jj int) []*pb.MonitorPoint { points := []*pb.MonitorPoint{} for i := ii * 32; i < 32*(1+ii); i++ { for j := jj * 32; j < 32*(1+jj); j++ { points = append(points, &pb.MonitorPoint{ Point: &pb.BPoint{ X: int32(i), Y: int32(j), }, Color: &pb.BColor{ Rgba: uint32(rand.Intn(256)) | uint32(rand.Intn(256))<<8 | uint32(rand.Intn(256))<<16 | 255<<24, }, }) } } return points } func Encode(obj interface{}) string { b, err := json.Marshal(obj) if err != nil { panic(err) } return base64.StdEncoding.EncodeToString(b) } // Handle incoming websockets func websocketHandler(w http.ResponseWriter, r *http.Request) { // Upgrade HTTP request to Websocket unsafeConn, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Print("upgrade:", err) return } c := &threadSafeWriter{unsafeConn, sync.Mutex{}} logrus.Info("NEW GRPC") // Configure and create a new PeerConnection. config := webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ { URLs: []string{"stun:melvans.com:3478"}, Username: "b84e4232f84505e3c3967184df374f2cdaa954f1", Credential: "afc39ae0591c132292f73a87f6f3725311fbafa0", // URLs: []string{"stun:stun.l.google.com:19302"}, }, }, } pc, err := webrtc.NewPeerConnection(config) if err != nil { logrus.Error(err) } // Create DataChannel. f := false sendChannel, err := pc.CreateDataChannel("data", &webrtc.DataChannelInit{ Ordered: &f, }) if err != nil { logrus.Error(err) } sendChannel.OnClose(func() { logrus.Println("sendChannel has closed") }) sendChannel.OnOpen(func() { logrus.Println("sendChannel has opened") }) sendChannel.OnMessage(func(msg webrtc.DataChannelMessage) { data := &pb.WebRTCRequest{} err := proto.Unmarshal(msg.Data, data) if err != nil { logrus.WithField("data", data).Error(err) return } logrus.WithField("data", data).Info("req") if data.GetRequestData() != nil { for i := 0; i < 8; i++ { for j := 0; j < 8; j++ { data := &pb.MonitorReply{ Points: Getp(i, j), } md, err := proto.Marshal(data) if err != nil { logrus.Error(err) continue } logrus.Info(len(md)) sendChannel.Send(md) // err = sendChannel.Send(md) // if err != nil { // logrus.Error(err) // continue // } } } } }) // Create a video track videoTrack, err := webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, "video", "pion") if err != nil { panic(err) } _, err = pc.AddTrack(videoTrack) if err != nil { panic(err) } // Create offer offer, err := pc.CreateOffer(nil) if err != nil { logrus.Error(err) } if err := pc.SetLocalDescription(offer); err != nil { logrus.Error(err) } // Add handlers for setting up the connection. pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { logrus.Println(state) }) // connectionString := make(chan bool) // Trickle ICE. Emit server candidate to client pc.OnICECandidate(func(i *webrtc.ICECandidate) { if i == nil { return } data := &pb.RTCData{ Event: string(pc.LocalDescription().Type.String()), Data: string(pc.LocalDescription().SDP), } logrus.WithField("data", data).Info("data to client") md, _ := proto.Marshal(data) w, _ := unsafeConn.NextWriter(websocket.BinaryMessage) w.Write(md) w.Close() }) for { _, raw, err := c.ReadMessage() data := &pb.RTCData{} if err != nil { logrus.Error(err) return } else if err := proto.Unmarshal(raw, data); err != nil { logrus.WithField("data", string(raw)).Error(err) return } logrus.WithField("data", data).Info("data from client") switch data.Event { case "candidate": candidate := webrtc.ICECandidateInit{} if err := json.Unmarshal([]byte(data.Data), &candidate); err != nil { logrus.Error(err) return } if err := pc.AddICECandidate(candidate); err != nil { logrus.Error(err) return } case "answer": answer := webrtc.SessionDescription{ Type: webrtc.NewSDPType(data.Event), SDP: data.Data, } if err := pc.SetRemoteDescription(answer); err != nil { logrus.Error(err) return } } } } type threadSafeWriter struct { *websocket.Conn sync.Mutex } func (t *threadSafeWriter) WriteJSON(v interface{}) error { t.Lock() defer t.Unlock() return t.Conn.WriteJSON(v) }