Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions adapters/audienceNetwork/facebook.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func Builder(bidderName openrtb_ext.BidderName, config config.Adapter, server co
return bidder, nil
}

func (a *adapter) MakeTimeoutNotification(req *adapters.RequestData) (*adapters.RequestData, []error) {
func (a *adapter) MakeTimeoutNotification(req *adapters.RequestData) (*adapters.RequestData, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not directly related. Updated the function signature since only a single error is used.

var (
rID string
pubID string
Expand All @@ -445,15 +445,13 @@ func (a *adapter) MakeTimeoutNotification(req *adapters.RequestData) (*adapters.
// corresponding imp's ID
rID, err = jsonparser.GetString(req.Body, "id")
if err != nil {
return &adapters.RequestData{}, []error{err}
return &adapters.RequestData{}, err
}

// The publisher ID is expected in the app object
pubID, err = jsonparser.GetString(req.Body, "app", "publisher", "id")
if err != nil {
return &adapters.RequestData{}, []error{
errors.New("path app.publisher.id not found in the request"),
}
return &adapters.RequestData{}, errors.New("path app.publisher.id not found in the request")
}

uri := fmt.Sprintf("https://www.facebook.com/audiencenetwork/nurl/?partner=%s&app=%s&auction=%s&ortb_loss_code=2", a.platformID, pubID, rID)
Expand Down
2 changes: 1 addition & 1 deletion adapters/bidder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type TimeoutBidder interface {
//
// Do note that if MakeRequests returns multiple requests, and more than one of these times out, MakeTimeoutNotice will be called
// once for each timed out request.
MakeTimeoutNotification(req *RequestData) (*RequestData, []error)
MakeTimeoutNotification(req *RequestData) (*RequestData, error)
}

// BidderResponse wraps the server's response with the list of bids and the currency used by the bidder.
Expand Down
7 changes: 7 additions & 0 deletions analytics/agma/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"compress/gzip"
"context"
"fmt"
"io"
"net/http"
"net/url"
"time"
Expand Down Expand Up @@ -74,6 +75,12 @@ func createHttpSender(httpClient *http.Client, endpoint config.AgmaAnalyticsHttp
glog.Errorf("[agmaAnalytics] Sending request failed %v", err)
return err
}
defer func() {
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
glog.Errorf("[agmaAnalytics] Draining response body failed: %v", err)
}
Copy link
Contributor Author

@SyntaxNode SyntaxNode Aug 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The response body must be fully read and closed to ensure connection reuse. Discarding the body is especially needed when the body content is ignored / not used.

resp.Body.Close()
}()

if resp.StatusCode != http.StatusOK {
glog.Errorf("[agmaAnalytics] Wrong code received %d instead of %d", resp.StatusCode, http.StatusOK)
Expand Down
18 changes: 16 additions & 2 deletions analytics/pubstack/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,38 @@ package pubstack

import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"time"

"github.com/docker/go-units"
"github.com/golang/glog"
)

func fetchConfig(client *http.Client, endpoint *url.URL) (*Configuration, error) {
res, err := client.Get(endpoint.String())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to add a timeout to this fetch while we are here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we should. I don't know the pubstack analytics module well enough to implement it. Will reach out to pubstack to implement.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. I will reach out to pubstack to see what a reasonable timeout would be and we can update it in another PR.

if err != nil {
return nil, err
}
defer func() {
// read the entire response body to ensure full connection reuse if there's an
// error while decoding the json
if _, err := io.Copy(io.Discard, res.Body); err != nil {
glog.Errorf("[pubstack] Draining config response body failed: %v", err)
}
res.Body.Close()
}()

if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code: %d", res.StatusCode)
}

defer res.Body.Close()
c := Configuration{}
err = json.NewDecoder(res.Body).Decode(&c)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to decode config: %w", err)
}
return &c, nil
}
Expand Down
8 changes: 7 additions & 1 deletion analytics/pubstack/eventchannel/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package eventchannel
import (
"bytes"
"fmt"
"io"
"net/http"
"net/url"
"path"
Expand All @@ -27,7 +28,12 @@ func NewHttpSender(client *http.Client, endpoint string) Sender {
if err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to add a timeout just above to the http request?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we should. I don't know the pubstack analytics module well enough to implement it. Will reach out to pubstack to implement.

return err
}
resp.Body.Close()
defer func() {
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
glog.Errorf("[pubstack] Draining sender response body failed: %v", err)
}
resp.Body.Close()
}()

if resp.StatusCode != http.StatusOK {
glog.Errorf("[pubstack] Wrong code received %d instead of %d", resp.StatusCode, http.StatusOK)
Expand Down
10 changes: 8 additions & 2 deletions currency/rate_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,20 @@ func (rc *RateConverter) fetch() (*Rates, error) {
if err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to add a timeout to the http request just above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Good catch. I added a configurable timeout with a default of 60 seconds. In test from my machine, latency is around 30ms. Since this isn't part of the real time auction flow, happy to set this to a higher limit considering there is no limit today. Hosts can reduce if they find it necessary or we can revisit the default later if there are no concerns right now.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. We can address this in a follow-up PR.

return nil, err
}
defer func() {
// read the entire response body to ensure full connection reuse if there's an
// invalid status code
if _, err := io.Copy(io.Discard, response.Body); err != nil {
glog.Errorf("Error draining conversion rates response body: %v", err)
}
response.Body.Close()
}()

if response.StatusCode >= 400 {
message := fmt.Sprintf("The currency rates request failed with status code %d", response.StatusCode)
return nil, &errortypes.BadServerResponse{Message: message}
}

defer response.Body.Close()

bytesJSON, err := io.ReadAll(response.Body)
if err != nil {
return nil, err
Expand Down
26 changes: 18 additions & 8 deletions exchange/bidder.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ func (bidder *BidderAdapter) doRequestImpl(ctx context.Context, req *adapters.Re
err: err,
}
}
defer httpResp.Body.Close()

respBody, err := io.ReadAll(httpResp.Body)
if err != nil {
Expand All @@ -644,7 +645,6 @@ func (bidder *BidderAdapter) doRequestImpl(ctx context.Context, req *adapters.Re
err: err,
}
}
defer httpResp.Body.Close()

if httpResp.StatusCode < 200 || httpResp.StatusCode >= 400 {
if httpResp.StatusCode >= 500 {
Expand All @@ -671,14 +671,25 @@ func (bidder *BidderAdapter) doRequestImpl(ctx context.Context, req *adapters.Re
func (bidder *BidderAdapter) doTimeoutNotification(timeoutBidder adapters.TimeoutBidder, req *adapters.RequestData, logger util.LogMsg) {
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
toReq, errL := timeoutBidder.MakeTimeoutNotification(req)
if toReq != nil && len(errL) == 0 {

toReq, errMakeTimeoutNotification := timeoutBidder.MakeTimeoutNotification(req)
if toReq != nil && errMakeTimeoutNotification == nil {
httpReq, err := http.NewRequest(toReq.Method, toReq.Uri, bytes.NewBuffer(toReq.Body))
if err == nil {
httpReq.Header = req.Headers
httpResp, err := ctxhttp.Do(ctx, bidder.Client, httpReq)
if err == nil {
defer func() {
if _, err := io.Copy(io.Discard, httpResp.Body); err != nil {
glog.Errorf("TimeoutNotification: Draining response body failed %v", err)
}
httpResp.Body.Close()
}()
}

success := (err == nil && httpResp.StatusCode >= 200 && httpResp.StatusCode < 300)
bidder.me.RecordTimeoutNotice(success)

if bidder.config.Debug.TimeoutNotification.Log && !(bidder.config.Debug.TimeoutNotification.FailOnly && success) {
var msg string
if err == nil {
Expand All @@ -697,16 +708,15 @@ func (bidder *BidderAdapter) doTimeoutNotification(timeoutBidder adapters.Timeou
}
}
} else if bidder.config.Debug.TimeoutNotification.Log {
reqJSON, err := jsonutil.Marshal(req)
reqJSON, errMarshal := jsonutil.Marshal(req)
var msg string
if err == nil {
msg = fmt.Sprintf("TimeoutNotification: Failed to generate timeout request: error(%s), bidder request(%s)", errL[0].Error(), string(reqJSON))
if errMarshal == nil {
msg = fmt.Sprintf("TimeoutNotification: Failed to generate timeout request: error(%s), bidder request(%s)", errMarshal.Error(), string(reqJSON))
} else {
msg = fmt.Sprintf("TimeoutNotification: Failed to generate timeout request: error(%s), bidder request marshal failed(%s)", errL[0].Error(), err.Error())
msg = fmt.Sprintf("TimeoutNotification: Failed to generate timeout request: error(%s), bidder request marshal failed(%s)", errMakeTimeoutNotification.Error(), errMarshal.Error())
}
util.LogRandomSample(msg, logger, bidder.config.Debug.TimeoutNotification.SamplingRate)
}

}

type httpCallInfo struct {
Expand Down
2 changes: 1 addition & 1 deletion exchange/bidder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2473,7 +2473,7 @@ func (bidder *notifyingBidder) MakeBids(internalRequest *openrtb2.BidRequest, ex
return nil, nil
}

func (bidder *notifyingBidder) MakeTimeoutNotification(req *adapters.RequestData) (*adapters.RequestData, []error) {
func (bidder *notifyingBidder) MakeTimeoutNotification(req *adapters.RequestData) (*adapters.RequestData, error) {
return &bidder.notifyRequest, nil
}

Expand Down
13 changes: 10 additions & 3 deletions floors/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,13 +238,13 @@ func (f *PriceFloorFetcher) fetchAndValidate(config config.AccountFloorFetch) (*
}

if len(floorResp) > (config.MaxFileSizeKB * 1024) {
glog.Errorf("Recieved invalid floor data from URL: %s, reason : floor file size is greater than MaxFileSize", config.URL)
glog.Errorf("Received invalid floor data from URL: %s, reason : floor file size is greater than MaxFileSize", config.URL)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated typo spotted while editing this file.

return nil, 0
}

var priceFloors openrtb_ext.PriceFloorRules
if err = json.Unmarshal(floorResp, &priceFloors.Data); err != nil {
glog.Errorf("Recieved invalid price floor json from URL: %s", config.URL)
glog.Errorf("Received invalid price floor json from URL: %s", config.URL)
return nil, 0
}

Expand All @@ -271,6 +271,14 @@ func (f *PriceFloorFetcher) fetchFloorRulesFromURL(config config.AccountFloorFet
if err != nil {
return nil, 0, errors.New("error while getting response from url : " + err.Error())
}
defer func() {
// read the entire response body to ensure full connection reuse if there's an
// invalid status code
if _, err := io.Copy(io.Discard, httpResp.Body); err != nil {
glog.Errorf("Error while draining fetched floor response body: %v", err)
}
httpResp.Body.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Body.Close() neccessary? We do call Close() on this body in unpackResponse()s caller function FetchRequests(...)

80   func (fetcher *HttpFetcher) FetchRequests(ctx context.Context, requestIDs []string, impIDs []string) (requestData map[string]json.RawMessage, impData map[string]json.RawMessage, errs []error) {
81 *--  8 lines: if len(requestIDs) == 0 && len(impIDs) == 0 {---------------------------------------------------------------------------------------------------------
89  
90       httpResp, err := ctxhttp.Do(ctx, fetcher.client, httpReq)
91       if err != nil {
92           return nil, nil, []error{err}
93       }
94       defer httpResp.Body.Close()
95       requestData, impData, errs = unpackResponse(httpResp)
96       return
97   }
stored_requests/backends/http_fetcher/fetcher.go

Do we get a panic for calling Close() twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I follow. unpackResponse does not close the body.

Copy link
Contributor

@guscarreon guscarreon Aug 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 94 inside FetchRequests(...) defers the call to httpResp.Body.Close(). Is it ok if unpackResponse(httpResp) calls httpResp.Body.Close() and we do another httpResp.Body.Close() before FetchRequests(...) goes out of scope?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got this one wrong @SyntaxNode. Nevermind

}()

if httpResp.StatusCode != http.StatusOK {
return nil, 0, errors.New("no response from server")
Expand All @@ -291,7 +299,6 @@ func (f *PriceFloorFetcher) fetchFloorRulesFromURL(config config.AccountFloorFet
if err != nil {
return nil, 0, errors.New("unable to read response")
}
defer httpResp.Body.Close()

return respBody, maxAge, nil
}
Expand Down
2 changes: 2 additions & 0 deletions stored_requests/backends/http_fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func (fetcher *HttpFetcher) FetchRequests(ctx context.Context, requestIDs []stri
return nil, nil, []error{err}
}
defer httpResp.Body.Close()

requestData, impData, errs = unpackResponse(httpResp)
return
}
Expand Down Expand Up @@ -142,6 +143,7 @@ func (fetcher *HttpFetcher) FetchAccounts(ctx context.Context, accountIDs []stri
}
}
defer httpResp.Body.Close()

respBytes, err := io.ReadAll(httpResp.Body)
if err != nil {
return nil, []error{
Expand Down
1 change: 1 addition & 0 deletions stored_requests/events/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type HTTPEvents struct {
func (e *HTTPEvents) fetchAll() {
ctx, cancel := e.ctxProducer()
defer cancel()

resp, err := ctxhttp.Get(ctx, e.client, e.Endpoint)
if respObj, ok := e.parse(e.Endpoint, resp, err); ok &&
(len(respObj.StoredRequests) > 0 || len(respObj.StoredImps) > 0 || len(respObj.StoredResponses) > 0 || len(respObj.Accounts) > 0) {
Expand Down
Loading