Skip to content

Commit 7107508

Browse files
authored
Merge pull request #1669 from hnws/master
feat(nest): add retry logic for 429 and 409 errors with exponential backoff
2 parents e1577b5 + cd2f90a commit 7107508

File tree

2 files changed

+169
-77
lines changed

2 files changed

+169
-77
lines changed

pkg/nest/api.go

Lines changed: 94 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -166,42 +166,100 @@ func (a *API) ExchangeSDP(projectID, deviceID, offer string) (string, error) {
166166

167167
uri := "https://smartdevicemanagement.googleapis.com/v1/enterprises/" +
168168
projectID + "/devices/" + deviceID + ":executeCommand"
169-
req, err := http.NewRequest("POST", uri, bytes.NewReader(b))
170-
if err != nil {
171-
return "", err
172-
}
173169

174-
req.Header.Set("Authorization", "Bearer "+a.Token)
170+
maxRetries := 3
171+
retryDelay := time.Second * 30
175172

176-
client := &http.Client{Timeout: time.Second * 5000}
177-
res, err := client.Do(req)
178-
if err != nil {
179-
return "", err
173+
for attempt := 0; attempt < maxRetries; attempt++ {
174+
req, err := http.NewRequest("POST", uri, bytes.NewReader(b))
175+
if err != nil {
176+
return "", err
177+
}
178+
179+
req.Header.Set("Authorization", "Bearer "+a.Token)
180+
181+
client := &http.Client{Timeout: time.Second * 5000}
182+
res, err := client.Do(req)
183+
if err != nil {
184+
return "", err
185+
}
186+
187+
// Handle 409 (Conflict), 429 (Too Many Requests), and 401 (Unauthorized)
188+
if res.StatusCode == 409 || res.StatusCode == 429 || res.StatusCode == 401 {
189+
res.Body.Close()
190+
if attempt < maxRetries-1 {
191+
// Get new token from Google
192+
if err := a.refreshToken(); err != nil {
193+
return "", err
194+
}
195+
time.Sleep(retryDelay)
196+
retryDelay *= 2 // exponential backoff
197+
continue
198+
}
199+
}
200+
201+
defer res.Body.Close()
202+
203+
if res.StatusCode != 200 {
204+
return "", errors.New("nest: wrong status: " + res.Status)
205+
}
206+
207+
var resv struct {
208+
Results struct {
209+
Answer string `json:"answerSdp"`
210+
ExpiresAt time.Time `json:"expiresAt"`
211+
MediaSessionID string `json:"mediaSessionId"`
212+
} `json:"results"`
213+
}
214+
215+
if err = json.NewDecoder(res.Body).Decode(&resv); err != nil {
216+
return "", err
217+
}
218+
219+
a.StreamProjectID = projectID
220+
a.StreamDeviceID = deviceID
221+
a.StreamSessionID = resv.Results.MediaSessionID
222+
a.StreamExpiresAt = resv.Results.ExpiresAt
223+
224+
return resv.Results.Answer, nil
180225
}
181-
defer res.Body.Close()
182226

183-
if res.StatusCode != 200 {
184-
return "", errors.New("nest: wrong status: " + res.Status)
227+
return "", errors.New("nest: max retries exceeded")
228+
}
229+
230+
func (a *API) refreshToken() error {
231+
// Get the cached API with matching token to get credentials
232+
var refreshKey string
233+
cacheMu.Lock()
234+
for key, api := range cache {
235+
if api.Token == a.Token {
236+
refreshKey = key
237+
break
238+
}
185239
}
240+
cacheMu.Unlock()
186241

187-
var resv struct {
188-
Results struct {
189-
Answer string `json:"answerSdp"`
190-
ExpiresAt time.Time `json:"expiresAt"`
191-
MediaSessionID string `json:"mediaSessionId"`
192-
} `json:"results"`
242+
if refreshKey == "" {
243+
return errors.New("nest: unable to find cached credentials")
193244
}
194245

195-
if err = json.NewDecoder(res.Body).Decode(&resv); err != nil {
196-
return "", err
246+
// Parse credentials from cache key
247+
parts := strings.Split(refreshKey, ":")
248+
if len(parts) != 3 {
249+
return errors.New("nest: invalid cache key format")
197250
}
251+
clientID, clientSecret, refreshToken := parts[0], parts[1], parts[2]
198252

199-
a.StreamProjectID = projectID
200-
a.StreamDeviceID = deviceID
201-
a.StreamSessionID = resv.Results.MediaSessionID
202-
a.StreamExpiresAt = resv.Results.ExpiresAt
253+
// Get new API instance which will refresh the token
254+
newAPI, err := NewAPI(clientID, clientSecret, refreshToken)
255+
if err != nil {
256+
return err
257+
}
203258

204-
return resv.Results.Answer, nil
259+
// Update current API with new token
260+
a.Token = newAPI.Token
261+
a.ExpiresAt = newAPI.ExpiresAt
262+
return nil
205263
}
206264

207265
func (a *API) ExtendStream() error {
@@ -407,20 +465,22 @@ type Device struct {
407465
}
408466

409467
func (a *API) StartExtendStreamTimer() {
410-
// Calculate the duration until 30 seconds before the stream expires
411-
duration := time.Until(a.StreamExpiresAt.Add(-30 * time.Second))
412-
a.extendTimer = time.AfterFunc(duration, func() {
468+
if a.extendTimer != nil {
469+
return
470+
}
471+
472+
a.extendTimer = time.NewTimer(time.Until(a.StreamExpiresAt) - time.Minute)
473+
go func() {
474+
<-a.extendTimer.C
413475
if err := a.ExtendStream(); err != nil {
414476
return
415477
}
416-
duration = time.Until(a.StreamExpiresAt.Add(-30 * time.Second))
417-
a.extendTimer.Reset(duration)
418-
})
478+
}()
419479
}
420480

421481
func (a *API) StopExtendStreamTimer() {
422-
if a.extendTimer == nil {
423-
return
482+
if a.extendTimer != nil {
483+
a.extendTimer.Stop()
484+
a.extendTimer = nil
424485
}
425-
a.extendTimer.Stop()
426486
}

pkg/nest/client.go

Lines changed: 75 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"errors"
55
"net/url"
66
"strings"
7+
"time"
78

89
"github.com/AlexxIT/go2rtc/pkg/core"
910
"github.com/AlexxIT/go2rtc/pkg/rtsp"
@@ -38,9 +39,26 @@ func Dial(rawURL string) (core.Producer, error) {
3839
return nil, errors.New("nest: wrong query")
3940
}
4041

41-
nestAPI, err := NewAPI(cliendID, cliendSecret, refreshToken)
42-
if err != nil {
43-
return nil, err
42+
maxRetries := 3
43+
retryDelay := time.Second * 30
44+
45+
var nestAPI *API
46+
var lastErr error
47+
48+
for attempt := 0; attempt < maxRetries; attempt++ {
49+
nestAPI, err = NewAPI(cliendID, cliendSecret, refreshToken)
50+
if err == nil {
51+
break
52+
}
53+
lastErr = err
54+
if attempt < maxRetries-1 {
55+
time.Sleep(retryDelay)
56+
retryDelay *= 2 // exponential backoff
57+
}
58+
}
59+
60+
if nestAPI == nil {
61+
return nil, lastErr
4462
}
4563

4664
protocols := strings.Split(query.Get("protocols"), ",")
@@ -79,48 +97,62 @@ func (c *WebRTCClient) MarshalJSON() ([]byte, error) {
7997
}
8098

8199
func rtcConn(nestAPI *API, rawURL, projectID, deviceID string) (*WebRTCClient, error) {
82-
rtcAPI, err := webrtc.NewAPI()
83-
if err != nil {
84-
return nil, err
85-
}
86-
87-
conf := pion.Configuration{}
88-
pc, err := rtcAPI.NewPeerConnection(conf)
89-
if err != nil {
90-
return nil, err
91-
}
92-
93-
conn := webrtc.NewConn(pc)
94-
conn.FormatName = "nest/webrtc"
95-
conn.Mode = core.ModeActiveProducer
96-
conn.Protocol = "http"
97-
conn.URL = rawURL
98-
99-
// https://developers.google.com/nest/device-access/traits/device/camera-live-stream#generatewebrtcstream-request-fields
100-
medias := []*core.Media{
101-
{Kind: core.KindAudio, Direction: core.DirectionRecvonly},
102-
{Kind: core.KindVideo, Direction: core.DirectionRecvonly},
103-
{Kind: "app"}, // important for Nest
104-
}
105-
106-
// 3. Create offer with candidates
107-
offer, err := conn.CreateCompleteOffer(medias)
108-
if err != nil {
109-
return nil, err
110-
}
111-
112-
// 4. Exchange SDP via Hass
113-
answer, err := nestAPI.ExchangeSDP(projectID, deviceID, offer)
114-
if err != nil {
115-
return nil, err
116-
}
117-
118-
// 5. Set answer with remote medias
119-
if err = conn.SetAnswer(answer); err != nil {
120-
return nil, err
100+
maxRetries := 3
101+
retryDelay := time.Second * 30
102+
var lastErr error
103+
104+
for attempt := 0; attempt < maxRetries; attempt++ {
105+
rtcAPI, err := webrtc.NewAPI()
106+
if err != nil {
107+
return nil, err
108+
}
109+
110+
conf := pion.Configuration{}
111+
pc, err := rtcAPI.NewPeerConnection(conf)
112+
if err != nil {
113+
return nil, err
114+
}
115+
116+
conn := webrtc.NewConn(pc)
117+
conn.FormatName = "nest/webrtc"
118+
conn.Mode = core.ModeActiveProducer
119+
conn.Protocol = "http"
120+
conn.URL = rawURL
121+
122+
// https://developers.google.com/nest/device-access/traits/device/camera-live-stream#generatewebrtcstream-request-fields
123+
medias := []*core.Media{
124+
{Kind: core.KindAudio, Direction: core.DirectionRecvonly},
125+
{Kind: core.KindVideo, Direction: core.DirectionRecvonly},
126+
{Kind: "app"}, // important for Nest
127+
}
128+
129+
// 3. Create offer with candidates
130+
offer, err := conn.CreateCompleteOffer(medias)
131+
if err != nil {
132+
return nil, err
133+
}
134+
135+
// 4. Exchange SDP via Hass
136+
answer, err := nestAPI.ExchangeSDP(projectID, deviceID, offer)
137+
if err != nil {
138+
lastErr = err
139+
if attempt < maxRetries-1 {
140+
time.Sleep(retryDelay)
141+
retryDelay *= 2
142+
continue
143+
}
144+
return nil, err
145+
}
146+
147+
// 5. Set answer with remote medias
148+
if err = conn.SetAnswer(answer); err != nil {
149+
return nil, err
150+
}
151+
152+
return &WebRTCClient{conn: conn, api: nestAPI}, nil
121153
}
122154

123-
return &WebRTCClient{conn: conn, api: nestAPI}, nil
155+
return nil, lastErr
124156
}
125157

126158
func rtspConn(nestAPI *API, rawURL, projectID, deviceID string) (*RTSPClient, error) {

0 commit comments

Comments
 (0)