Skip to content

Commit bfd2656

Browse files
committed
Add flussonic source #1678
1 parent be3a1c5 commit bfd2656

File tree

3 files changed

+188
-0
lines changed

3 files changed

+188
-0
lines changed

internal/flussonic/flussonic.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package flussonic
2+
3+
import (
4+
"github.com/AlexxIT/go2rtc/internal/streams"
5+
"github.com/AlexxIT/go2rtc/pkg/flussonic"
6+
)
7+
8+
func Init() {
9+
streams.HandleFunc("flussonic", flussonic.Dial)
10+
}

main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/AlexxIT/go2rtc/internal/exec"
1313
"github.com/AlexxIT/go2rtc/internal/expr"
1414
"github.com/AlexxIT/go2rtc/internal/ffmpeg"
15+
"github.com/AlexxIT/go2rtc/internal/flussonic"
1516
"github.com/AlexxIT/go2rtc/internal/gopro"
1617
"github.com/AlexxIT/go2rtc/internal/hass"
1718
"github.com/AlexxIT/go2rtc/internal/hls"
@@ -88,6 +89,7 @@ func main() {
8889
gopro.Init() // gopro source
8990
doorbird.Init() // doorbird source
9091
v4l2.Init() // v4l2 source
92+
flussonic.Init()
9193

9294
// 6. Helper modules
9395

pkg/flussonic/flussonic.go

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package flussonic
2+
3+
import (
4+
"strings"
5+
6+
"github.com/AlexxIT/go2rtc/pkg/aac"
7+
"github.com/AlexxIT/go2rtc/pkg/core"
8+
"github.com/AlexxIT/go2rtc/pkg/h264"
9+
"github.com/AlexxIT/go2rtc/pkg/iso"
10+
"github.com/gorilla/websocket"
11+
"github.com/pion/rtp"
12+
)
13+
14+
type Producer struct {
15+
core.Connection
16+
conn *websocket.Conn
17+
18+
videoTrackID, audioTrackID uint32
19+
videoTimeScale, audioTimeScale float32
20+
}
21+
22+
func Dial(source string) (core.Producer, error) {
23+
url, _ := strings.CutPrefix(source, "flussonic:")
24+
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
25+
if err != nil {
26+
return nil, err
27+
}
28+
29+
prod := &Producer{
30+
Connection: core.Connection{
31+
ID: core.NewID(),
32+
FormatName: "flussonic",
33+
Protocol: core.Before(url, ":"), // wss
34+
RemoteAddr: conn.RemoteAddr().String(),
35+
URL: url,
36+
Transport: conn,
37+
},
38+
conn: conn,
39+
}
40+
41+
if err = prod.probe(); err != nil {
42+
_ = conn.Close()
43+
return nil, err
44+
}
45+
46+
return prod, nil
47+
}
48+
49+
func (p *Producer) probe() error {
50+
var init struct {
51+
//Metadata struct {
52+
// Tracks []struct {
53+
// Width int `json:"width,omitempty"`
54+
// Height int `json:"height,omitempty"`
55+
// Fps int `json:"fps,omitempty"`
56+
// Content string `json:"content"`
57+
// TrackId string `json:"trackId"`
58+
// Bitrate int `json:"bitrate"`
59+
// } `json:"tracks"`
60+
//} `json:"metadata"`
61+
Tracks []struct {
62+
Content string `json:"content"`
63+
Id uint32 `json:"id"`
64+
Payload []byte `json:"payload"`
65+
} `json:"tracks"`
66+
//Type string `json:"type"`
67+
}
68+
69+
if err := p.conn.ReadJSON(&init); err != nil {
70+
return err
71+
}
72+
73+
var timeScale uint32
74+
75+
for _, track := range init.Tracks {
76+
atoms, _ := iso.DecodeAtoms(track.Payload)
77+
for _, atom := range atoms {
78+
switch atom := atom.(type) {
79+
case *iso.AtomMdhd:
80+
timeScale = atom.TimeScale
81+
case *iso.AtomVideo:
82+
switch atom.Name {
83+
case "avc1":
84+
codec := h264.AVCCToCodec(atom.Config)
85+
p.Medias = append(p.Medias, &core.Media{
86+
Kind: core.KindVideo,
87+
Direction: core.DirectionRecvonly,
88+
Codecs: []*core.Codec{codec},
89+
})
90+
p.videoTrackID = track.Id
91+
p.videoTimeScale = float32(codec.ClockRate) / float32(timeScale)
92+
}
93+
case *iso.AtomAudio:
94+
switch atom.Name {
95+
case "mp4a":
96+
codec := aac.ConfigToCodec(atom.Config)
97+
p.Medias = append(p.Medias, &core.Media{
98+
Kind: core.KindAudio,
99+
Direction: core.DirectionRecvonly,
100+
Codecs: []*core.Codec{codec},
101+
})
102+
p.audioTrackID = track.Id
103+
p.audioTimeScale = float32(codec.ClockRate) / float32(timeScale)
104+
}
105+
}
106+
}
107+
}
108+
109+
return nil
110+
}
111+
112+
func (p *Producer) Start() error {
113+
if err := p.conn.WriteMessage(websocket.TextMessage, []byte("resume")); err != nil {
114+
return err
115+
}
116+
117+
receivers := make(map[uint32]*core.Receiver)
118+
timeScales := make(map[uint32]float32)
119+
120+
for _, receiver := range p.Receivers {
121+
switch receiver.Codec.Kind() {
122+
case core.KindVideo:
123+
receivers[p.videoTrackID] = receiver
124+
timeScales[p.videoTrackID] = p.videoTimeScale
125+
case core.KindAudio:
126+
receivers[p.audioTrackID] = receiver
127+
timeScales[p.audioTrackID] = p.audioTimeScale
128+
}
129+
}
130+
131+
ch := make(chan []byte, 10)
132+
defer close(ch)
133+
134+
go func() {
135+
for b := range ch {
136+
atoms, err := iso.DecodeAtoms(b)
137+
if err != nil {
138+
continue
139+
}
140+
141+
var trackID uint32
142+
var decodeTime uint64
143+
144+
for _, atom := range atoms {
145+
switch atom := atom.(type) {
146+
case *iso.AtomTfhd:
147+
trackID = atom.TrackID
148+
case *iso.AtomTfdt:
149+
decodeTime = atom.DecodeTime
150+
case *iso.AtomMdat:
151+
b = atom.Data
152+
}
153+
}
154+
155+
if recv := receivers[trackID]; recv != nil {
156+
timestamp := uint32(float32(decodeTime) * timeScales[trackID])
157+
packet := &rtp.Packet{
158+
Header: rtp.Header{Timestamp: timestamp},
159+
Payload: b,
160+
}
161+
recv.WriteRTP(packet)
162+
}
163+
}
164+
}()
165+
166+
for {
167+
mType, b, err := p.conn.ReadMessage()
168+
if err != nil {
169+
return err
170+
}
171+
if mType == websocket.BinaryMessage {
172+
p.Recv += len(b)
173+
ch <- b
174+
}
175+
}
176+
}

0 commit comments

Comments
 (0)