Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions adapters/audienceNetwork/facebook.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func Builder(bidderName openrtb_ext.BidderName, config config.Adapter, server co
return bidder, nil
}

func (a *adapter) MakeTimeoutNotification(req *adapters.RequestData) (*adapters.RequestData, []error) {
func (a *adapter) MakeTimeoutNotification(req *adapters.RequestData) (*adapters.RequestData, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not directly related. Updated the function signature since only a single error is used.

var (
rID string
pubID string
Expand All @@ -445,15 +445,13 @@ func (a *adapter) MakeTimeoutNotification(req *adapters.RequestData) (*adapters.
// corresponding imp's ID
rID, err = jsonparser.GetString(req.Body, "id")
if err != nil {
return &adapters.RequestData{}, []error{err}
return &adapters.RequestData{}, err
}

// The publisher ID is expected in the app object
pubID, err = jsonparser.GetString(req.Body, "app", "publisher", "id")
if err != nil {
return &adapters.RequestData{}, []error{
errors.New("path app.publisher.id not found in the request"),
}
return &adapters.RequestData{}, errors.New("path app.publisher.id not found in the request")
}

uri := fmt.Sprintf("https://www.facebook.com/audiencenetwork/nurl/?partner=%s&app=%s&auction=%s&ortb_loss_code=2", a.platformID, pubID, rID)
Expand Down
2 changes: 1 addition & 1 deletion adapters/bidder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type TimeoutBidder interface {
//
// Do note that if MakeRequests returns multiple requests, and more than one of these times out, MakeTimeoutNotice will be called
// once for each timed out request.
MakeTimeoutNotification(req *RequestData) (*RequestData, []error)
MakeTimeoutNotification(req *RequestData) (*RequestData, error)
}

// BidderResponse wraps the server's response with the list of bids and the currency used by the bidder.
Expand Down
7 changes: 7 additions & 0 deletions analytics/agma/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"compress/gzip"
"context"
"fmt"
"io"
"net/http"
"net/url"
"time"
Expand Down Expand Up @@ -74,6 +75,12 @@ func createHttpSender(httpClient *http.Client, endpoint config.AgmaAnalyticsHttp
glog.Errorf("[agmaAnalytics] Sending request failed %v", err)
return err
}
defer func() {
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
glog.Errorf("[agmaAnalytics] Draining response body failed: %v", err)
}
Copy link
Contributor Author

@SyntaxNode SyntaxNode Aug 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The response body must be fully read and closed to ensure connection reuse. Discarding the body is especially needed when the body content is ignored / not used.

resp.Body.Close()
}()

if resp.StatusCode != http.StatusOK {
glog.Errorf("[agmaAnalytics] Wrong code received %d instead of %d", resp.StatusCode, http.StatusOK)
Expand Down
18 changes: 16 additions & 2 deletions analytics/pubstack/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,38 @@ package pubstack

import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"time"

"github.com/docker/go-units"
"github.com/golang/glog"
)

func fetchConfig(client *http.Client, endpoint *url.URL) (*Configuration, error) {
res, err := client.Get(endpoint.String())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to add a timeout to this fetch while we are here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we should. I don't know the pubstack analytics module well enough to implement it. Will reach out to pubstack to implement.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. I will reach out to pubstack to see what a reasonable timeout would be and we can update it in another PR.

if err != nil {
return nil, err
}
defer func() {
// read the entire response body to ensure full connection reuse if there's an
// error while decoding the json
if _, err := io.Copy(io.Discard, res.Body); err != nil {
glog.Errorf("[pubstack] Draining config response body failed: %v", err)
}
res.Body.Close()
}()

if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code: %d", res.StatusCode)
}

defer res.Body.Close()
c := Configuration{}
err = json.NewDecoder(res.Body).Decode(&c)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to decode config: %w", err)
}
return &c, nil
}
Expand Down
8 changes: 7 additions & 1 deletion analytics/pubstack/eventchannel/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package eventchannel
import (
"bytes"
"fmt"
"io"
"net/http"
"net/url"
"path"
Expand All @@ -27,7 +28,12 @@ func NewHttpSender(client *http.Client, endpoint string) Sender {
if err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to add a timeout just above to the http request?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we should. I don't know the pubstack analytics module well enough to implement it. Will reach out to pubstack to implement.

return err
}
resp.Body.Close()
defer func() {
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
glog.Errorf("[pubstack] Draining sender response body failed: %v", err)
}
resp.Body.Close()
}()

if resp.StatusCode != http.StatusOK {
glog.Errorf("[pubstack] Wrong code received %d instead of %d", resp.StatusCode, http.StatusOK)
Expand Down
11 changes: 8 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,15 +472,19 @@ type Analytics struct {
}

type CurrencyConverter struct {
FetchURL string `mapstructure:"fetch_url"`
FetchIntervalSeconds int `mapstructure:"fetch_interval_seconds"`
StaleRatesSeconds int `mapstructure:"stale_rates_seconds"`
FetchURL string `mapstructure:"fetch_url"`
FetchTimeoutMilliseconds int `mapstructure:"fetch_timeout_ms"`
FetchIntervalSeconds int `mapstructure:"fetch_interval_seconds"`
StaleRatesSeconds int `mapstructure:"stale_rates_seconds"`
}

func (cfg *CurrencyConverter) validate(errs []error) []error {
if cfg.FetchIntervalSeconds < 0 {
errs = append(errs, fmt.Errorf("currency_converter.fetch_interval_seconds must be in the range [0, %d]. Got %d", 0xffff, cfg.FetchIntervalSeconds))
}
if cfg.FetchTimeoutMilliseconds < 0 {
errs = append(errs, fmt.Errorf("currency_converter.fetch_timeout_ms must be 0 or greater. Got %d", cfg.FetchTimeoutMilliseconds))
}
return errs
}

Expand Down Expand Up @@ -1173,6 +1177,7 @@ func SetupViper(v *viper.Viper, filename string, bidderInfos BidderInfos) {
v.SetDefault("ccpa.enforce", false)
v.SetDefault("lmt.enforce", true)
v.SetDefault("currency_converter.fetch_url", "https://cdn.jsdelivr.net/gh/prebid/currency-file@1/latest.json")
v.SetDefault("currency_converter.fetch_timeout_ms", 60000) // 60 seconds
v.SetDefault("currency_converter.fetch_interval_seconds", 1800) // fetch currency rates every 30 minutes
v.SetDefault("currency_converter.stale_rates_seconds", 0)
v.SetDefault("default_request.type", "")
Expand Down
1 change: 1 addition & 0 deletions currency/currency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func TestGetAuctionCurrencyRates(t *testing.T) {

return NewRateConverter(
mockCurrencyClient,
60*time.Second,
"currency.fake.com",
24*time.Hour,
)
Expand Down
25 changes: 19 additions & 6 deletions currency/rate_converter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package currency

import (
"context"
"fmt"
"io"
"net/http"
Expand All @@ -16,6 +17,7 @@ import (
// RateConverter holds the currencies conversion rates dictionary
type RateConverter struct {
httpClient httpClient
httpTimeout time.Duration
staleRatesThreshold time.Duration
syncSourceURL string
rates atomic.Value // Should only hold Rates struct
Expand All @@ -27,11 +29,13 @@ type RateConverter struct {
// NewRateConverter returns a new RateConverter
func NewRateConverter(
httpClient httpClient,
httpTimeout time.Duration,
syncSourceURL string,
staleRatesThreshold time.Duration,
) *RateConverter {
return &RateConverter{
httpClient: httpClient,
httpTimeout: httpTimeout,
staleRatesThreshold: staleRatesThreshold,
syncSourceURL: syncSourceURL,
rates: atomic.Value{},
Expand All @@ -43,7 +47,10 @@ func NewRateConverter(

// fetch allows to retrieve the currencies rates from the syncSourceURL provided
func (rc *RateConverter) fetch() (*Rates, error) {
request, err := http.NewRequest("GET", rc.syncSourceURL, nil)
ctx, cancel := context.WithTimeout(context.Background(), rc.httpTimeout)
defer cancel()

request, err := http.NewRequestWithContext(ctx, "GET", rc.syncSourceURL, nil)
if err != nil {
return nil, err
}
Expand All @@ -52,23 +59,29 @@ func (rc *RateConverter) fetch() (*Rates, error) {
if err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to add a timeout to the http request just above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Good catch. I added a configurable timeout with a default of 60 seconds. In test from my machine, latency is around 30ms. Since this isn't part of the real time auction flow, happy to set this to a higher limit considering there is no limit today. Hosts can reduce if they find it necessary or we can revisit the default later if there are no concerns right now.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. We can address this in a follow-up PR.

return nil, err
}
defer func() {
// read the entire response body to ensure full connection reuse if there's an
// invalid status code
if _, err := io.Copy(io.Discard, response.Body); err != nil {
glog.Errorf("error draining conversion rates response body: %v", err)
}
response.Body.Close()
}()

if response.StatusCode >= 400 {
message := fmt.Sprintf("The currency rates request failed with status code %d", response.StatusCode)
message := fmt.Sprintf("the currency rates request failed with status code %d", response.StatusCode)
return nil, &errortypes.BadServerResponse{Message: message}
}

defer response.Body.Close()

bytesJSON, err := io.ReadAll(response.Body)
if err != nil {
return nil, err
return nil, fmt.Errorf("the currency rates request failed: %v", err)
}

updatedRates := &Rates{}
err = jsonutil.UnmarshalValid(bytesJSON, updatedRates)
if err != nil {
return nil, err
return nil, fmt.Errorf("the currency rates request failed to parse json: %v", err)
}

return updatedRates, err
Expand Down
4 changes: 4 additions & 0 deletions currency/rate_converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func TestReadWriteRates(t *testing.T) {
}
currencyConverter := NewRateConverter(
&http.Client{},
60*time.Second,
url,
24*time.Hour,
)
Expand Down Expand Up @@ -181,6 +182,7 @@ func TestRateStaleness(t *testing.T) {
// Execute:
currencyConverter := NewRateConverter(
&http.Client{},
60*time.Second,
mockedHttpServer.URL,
30*time.Second, // stale rates threshold
)
Expand Down Expand Up @@ -268,6 +270,7 @@ func TestRatesAreNeverConsideredStale(t *testing.T) {
// Execute:
currencyConverter := NewRateConverter(
&http.Client{},
60*time.Second,
mockedHttpServer.URL,
0*time.Millisecond, // stale rates threshold
)
Expand Down Expand Up @@ -321,6 +324,7 @@ func TestRace(t *testing.T) {
interval := 1 * time.Millisecond
currencyConverter := NewRateConverter(
mockedHttpClient,
60*time.Second,
"currency.fake.com",
24*time.Hour,
)
Expand Down
2 changes: 1 addition & 1 deletion endpoints/openrtb2/auction_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func BenchmarkOpenrtbEndpoint(b *testing.B) {
nilMetrics,
infos,
gdprPermsBuilder,
currency.NewRateConverter(&http.Client{}, "", time.Duration(0)),
currency.NewRateConverter(&http.Client{}, 60*time.Second, "", time.Duration(0)),
empty_fetcher.EmptyFetcher{},
&adscert.NilSigner{},
macros.NewStringIndexBasedReplacer(),
Expand Down
2 changes: 1 addition & 1 deletion endpoints/openrtb2/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1255,7 +1255,7 @@ func buildTestExchange(testCfg *testConfigValues, adapterMap map[openrtb_ext.Bid
}
}

mockCurrencyConverter := currency.NewRateConverter(mockCurrencyRatesServer.Client(), mockCurrencyRatesServer.URL, time.Second)
mockCurrencyConverter := currency.NewRateConverter(mockCurrencyRatesServer.Client(), time.Second, mockCurrencyRatesServer.URL, time.Second)
mockCurrencyConverter.Run()

gdprPermsBuilder := fakePermissionsBuilder{
Expand Down
26 changes: 18 additions & 8 deletions exchange/bidder.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ func (bidder *BidderAdapter) doRequestImpl(ctx context.Context, req *adapters.Re
err: err,
}
}
defer httpResp.Body.Close()

respBody, err := io.ReadAll(httpResp.Body)
if err != nil {
Expand All @@ -644,7 +645,6 @@ func (bidder *BidderAdapter) doRequestImpl(ctx context.Context, req *adapters.Re
err: err,
}
}
defer httpResp.Body.Close()

if httpResp.StatusCode < 200 || httpResp.StatusCode >= 400 {
if httpResp.StatusCode >= 500 {
Expand All @@ -671,14 +671,25 @@ func (bidder *BidderAdapter) doRequestImpl(ctx context.Context, req *adapters.Re
func (bidder *BidderAdapter) doTimeoutNotification(timeoutBidder adapters.TimeoutBidder, req *adapters.RequestData, logger util.LogMsg) {
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
toReq, errL := timeoutBidder.MakeTimeoutNotification(req)
if toReq != nil && len(errL) == 0 {

toReq, errMakeTimeoutNotification := timeoutBidder.MakeTimeoutNotification(req)
if toReq != nil && errMakeTimeoutNotification == nil {
httpReq, err := http.NewRequest(toReq.Method, toReq.Uri, bytes.NewBuffer(toReq.Body))
if err == nil {
httpReq.Header = req.Headers
httpResp, err := ctxhttp.Do(ctx, bidder.Client, httpReq)
if err == nil {
defer func() {
if _, err := io.Copy(io.Discard, httpResp.Body); err != nil {
glog.Errorf("TimeoutNotification: Draining response body failed %v", err)
}
httpResp.Body.Close()
}()
}

success := (err == nil && httpResp.StatusCode >= 200 && httpResp.StatusCode < 300)
bidder.me.RecordTimeoutNotice(success)

if bidder.config.Debug.TimeoutNotification.Log && !(bidder.config.Debug.TimeoutNotification.FailOnly && success) {
var msg string
if err == nil {
Expand All @@ -697,16 +708,15 @@ func (bidder *BidderAdapter) doTimeoutNotification(timeoutBidder adapters.Timeou
}
}
} else if bidder.config.Debug.TimeoutNotification.Log {
reqJSON, err := jsonutil.Marshal(req)
reqJSON, errMarshal := jsonutil.Marshal(req)
var msg string
if err == nil {
msg = fmt.Sprintf("TimeoutNotification: Failed to generate timeout request: error(%s), bidder request(%s)", errL[0].Error(), string(reqJSON))
if errMarshal == nil {
msg = fmt.Sprintf("TimeoutNotification: Failed to generate timeout request: error(%s), bidder request(%s)", errMarshal.Error(), string(reqJSON))
} else {
msg = fmt.Sprintf("TimeoutNotification: Failed to generate timeout request: error(%s), bidder request marshal failed(%s)", errL[0].Error(), err.Error())
msg = fmt.Sprintf("TimeoutNotification: Failed to generate timeout request: error(%s), bidder request marshal failed(%s)", errMakeTimeoutNotification.Error(), errMarshal.Error())
}
util.LogRandomSample(msg, logger, bidder.config.Debug.TimeoutNotification.SamplingRate)
}

}

type httpCallInfo struct {
Expand Down
Loading
Loading