Public Void @f2prateek

Powering up HTTP clients with Train

Middlewares make it easy to write independent and reusable modules for HTTP servers. For instance, we’ve abstracted common reporting with our statsd and logger middlewares. This makes it easy for new services to be built with reliable reporting from day one.

h1 := statsd.New(stats)(app)
h2 := logger.New()(h1)
http.ListenAndServe(":12345", h2)

Although easy enough to write it ourselves, Alice is a tiny library that makes it easy to chain multiple middlewares.

chain := alice.New(logger.New(), statsd.New(stats))
http.ListenAndServe(":12345", chain.Then(app))

While working on sources, we realized we needed a similar solution for writing http clients. At their core, sources are pretty simple. Sources make http requests to a third party service, translate the responses into our warehouses format, and forward it to our warehouse objects API. A majority of the errors are from failing HTTP requests. To have complete visibility into the performance of sources, we needed to be able to record metrics and log activity of the HTTP client across multiple codebases.

Borrowing from OkHttp’s interceptors, I wrote Train. At it’s core, train takes a series of Interceptors and combines them to return a RoundTripper that runs the interceptors in order.

Powerful

Interceptors enable endless use cases - they can observe, modify and even retry calls.

Observing Requests and Responses

Interceptors can continue the chain as is and observe outgoing requests and incoming responses. This is useful for reporting purposes, such as logging and stats.

func Dump(chain train.Chain) (*http.Response, error) {
  req := chain.Request()
  fmt.Println(httputil.DumpRequestOut(req, true))

  resp, err := chain.Proceed(req)
  if err != nil {
    return nil, err
  }
  fmt.Println(httputil.DumpResponse(resp, true))

  return resp, err
})

Modifying Requests

Interceptors can modify outgoing requests. For example, you can compress the request body if your server supports it.

func Compress(chain train.Chain) (*http.Response, error) {
  req := chain.Request()

  contentEncoding := resp.Header.Get("Content-Encoding")
  if resp.Body != nil && contentEncoding != "" {
    z, err := zlib.NewReader(req.Body)
    if err != nil {
      return nil, err
    }
    req.Body = z
    req.Header["Content-Encoding"] = "zlib"
  }

  return chain.Proceed(req)
})

Modifying Responses

Interceptors can modify incoming responses. Similar to above, you can decompress the response body before your application processes the response.

func Decompress(chain train.Chain) (*http.Response, error) {
  req := chain.Request()
  resp, err := chain.Proceed(req)
  if err != nil {
    return nil, resp
  }

  contentEncoding := resp.Header.Get("Content-Encoding")
  if resp.Body != nil && contentEncoding == "zlib"  {
    z, err := zlib.NewReader(resp.Body)
    if err != nil {
      return nil, err
    }
    resp.Body = z
  }

  return resp, err
})

Short Circuiting

Interceptors can short circuit the chain — this makes it great for testing.

func Short(train.Chain) (*http.Response, error) {
  return nil, errors.New("somebody set up us the bomb")
})

Pluggable

Like HTTP server middlewares, interceptors make it easy to share common HTTP client logic. Interceptors can be plugged into HTTP client as a transport.

transport := train.Transport(logger.New(), statsd.New(stats))
client := &http.Client{
  Transport: transport,
}

This also makes it easy to plug into client libraries built by other developers. For instance, we plugged in our stats interceptor to the Intercom Go library and had logs and metrics for free without needing to modify the source.

t := train.Transport(logger.New(), statsd.New(stats))

return &interfaces.IntercomHTTPClient{
  Client: &http.Client{
		Transport: t,
  },
}

Chainable

Interceptors build upon the Chain interface.

Interceptors are consulted in the order they are provided. You’ll need to decide what order you want your interceptors to be called in.

For example, this chain will record stats about the compressed request and response. The stats interceptor is invoked after the compression interceptor compresses the request and before the compression interceptor decompresses the response.

transport := train.Transport(compress, log, stats)

The second example will record stats about the decompressed request and response. The stats interceptor is invoked before the compression interceptor compresses the request and after the compression interceptor decompresses the response.

transport := train.Transport(log, stats, compress)

Extensible

Train is designed to be extensible. We’ve been using it for a while to power up the standard library http client in our Go sources — including adding logging, fixing server errors, and collecting stats for our invaluable Datadog dashboards.

Try it out and let me know what you think!

Gzip

This week we enabled gzip for our mobile libraries. Gzip is a compression format widely used in HTTP networking. With gzip we saw over 10x reduction in the POST request body to upload our batched event data.

Our Tracking API uses (mostly) vanilla Go. Enabling gzip decompression was a breeze using the compress/gzip package (thanks Amir).

func (s *Server) handle(w http.ResponseWriter, r *http.Request)
  encoding := r.Header.Get("Content-Encoding")
  if encoding == "gzip" {
  	z, err := gzip.NewReader(r.Body)
  	if err != nil {
  		http.Error(w, "malformed gzip content", 400)
  		return
  	}

  	defer z.Close()
  	r.Body = z
  }
  ...
}

On Android, we can take advantage of GZIPOutputStream from the Java standard library.

void post(byte[] data) throws IOException {
  URL url = new URL("https://api.segment.io/v1/import");
  HttpURLConnection conn = (HttpURLConnection) url.openConnection();
  conn.setRequestProperty("Content-Encoding", "gzip");
  conn.setRequestProperty("Content-Type", "application/json");
  OutputStream os = conn.getOutputStream();
  OutputStream gzipped = new GZIPOutputStream(os);
  gzipped.write(data);
  ...
}

Adding iOS support was the most challenging of the three. There are no standard library APIs for gzipping data, so we pulled in the relevant code from Nick Lockwood’s implementation on Github. The final snippet is tiny and fits perfectly as an NSData extension.

#import "NSData+GZIP.h"

- (void)sendData:(NSData *)data
{
    NSMutableURLRequest *urlRequest = [NSMutableURLRequest requestWithURL:@"https://api.segment.io/v1/import"];
    [urlRequest setValue:@"gzip" forHTTPHeaderField:@"Content-Encoding"];
    [urlRequest setValue:@"application/json" forHTTPHeaderField:@"Content-Type"];
    [urlRequest setHTTPMethod:@"POST"];
    [urlRequest setHTTPBody:[data gzippedData]];
    ...
}

Implementing it across our different code bases was surprisingly easy. We were up and running from discussion to implementation within a day’s worth of work across our server and mobile libraries. And the savings definitely made it worth our time!

Factory functions in Go

Static factory methods are extremely powerful in Java, as noted in Item 1 of Effective Java. Here’s a translation of how factory functions are effective in Go.

There are a few ways to create a value of a struct.

// Allocating memory with new.
buf := new(bytes.Buffer) // type *bytes.Buffer

// Simply declaring the variable.
var buf bytes.Buffer

// Using a struct initializer.
buf := bytes.Buffer{}

Structs that cannot be used in their zero value, require an initialization constructor.

f := os.File{fd, name, nil, 0}

There is another way for packages to let clients allocate and initialize structs. A package can provide an factory function, which is an exported function that returns an initialized struct using one of the options seen above. Here’s an example from the strings package. This function returns a newly initialized strings.Reader struct.

func NewReader(s string) *Reader {
  return &Reader{s, 0, -1}
}

One advantage of factory functions is that, unlike initializers, they provide initialization logic. Packages can use this for a variety purposes. For example, http.NewRequest checks that the given URL string is a valid URL. Packages can also use this to initialize internal dependencies. The http.NewServeMux function transparently initializes internal fields.

package http

func NewServeMux() *ServeMux {
  return &ServeMux{m: make(map[string]muxEntry)}
}
package foo

mux := http.NewServeMux()

Without a factory function, both the field m and type muxEntry would have to be exported in it’s public API, additionally forcing clients to initialize them.

package foo

mux := &http.ServeMux{M: make(map[string]http.MuxEntry)}

A second advantage of factory functions is that, unlike initializers, they are not required to create a new value each time they’re invoked. This allows packages to return preinitialized values, or cache values as they’re initialized and dispense them repeatedly to avoid unnecessary allocations. These implementations can all be provided with a single API using a factory function, and gives packages better control over the behavior and performance characteristics as needed.

A third advantage of factory functions is that, unlike initializers, they can return a value of any subtype of their return type. This gives packages greater flexibility in choosing the type of the returned value. One application of this flexibility is that an API can return interface with making their actual type public. For example, the errors.New function returns an error interface and not *errorString. This requires callers to reference the return value by its interface rather than it’s concrete type. In a future version, Go could completely remove the errorString type, and replace it with another implementation of error without breaking API compatibility. Consumers of the API would be none the wiser.

The main disadvantage of factory functions is that, unlike initializers, they are not readily distinguishable from other functions. This makes it difficult to figure out how to use a struct that must be initialized via a factory function. The best workaround for now is to rally around established conventions. Factory functions are typically named New (as in errors.New) or New{type} (as in bufio.NewReader).

A second disadvantage of factory functions is that, unlike initializers, they don’t support named arguments. This makes consumer code harder to read and figure out what the values represent.

package foo

// Using a factory function.
req, _ := http.NewRequest("GET", "http://f2prateek.com", nil)

// Using a struct initializer.
req := &http.Request{
  Method: "GET",
  URL: MustParse("http://f2prateek.com"),
  Body: nil,
}

Arguably, the former can be easier to comprehend since the http.Request type contains a total of 19 fields. This makes it confusing for new consumers, who have no idea which fields are required to correctly initialize a request. Using a factory function allows the http package to highlight the fields required for proper initialization.

Rx Preferences

Android’s SharedPreferences offers a convenient mechanism to persist a collection of key-value pairs.

It’s simplistic API makes it limiting for a few reasons:

  • Callers must always know the preference key and type.
  • No support for storing custom types out of the box.
  • Callers cannot listen for changes to individual keys.

RxPreferences is a new(ish) library that builds on top of SharedPreferences to solve these problems, and takes it further by integrating with RxJava.

Typed Preferences

SharedPreferences requires callers to always know what key identifies a preference when they get or save a preference. Callers also need to keep track of what type was used for a preference (did the preference use a float or int), which can lead to subtle bugs.

SharedPreferences preferences = getDefaultSharedPreferences(this);
preferences.edit().putFloat("scale", 3.14f).commit();

// Circumvents the compiler and blows up at runtime.
preferences.getInt("scale", 0);

RxPreferences introduces a Preference class, that identifies key used to store it and the type of data it holds, making it easier to spot such bugs at compile time. RxSharedPreferences provides factory methods to promote preferences to objects.

SharedPreferences preferences = getDefaultSharedPreferences(this);
RxSharedPreferences rxPrefs = RxSharedPreferences.create(preferences);
Preference<Integer> foo = rxPrefs.getInt("foo");

foo.set(3.14f); // Will not compile!

The Preference class provides methods that replace their counterparts in SharedPreferences and SharedPreferences.Editor. This makes it convenient to use them as the source of truth, instead of sharing String constants throughout your app.

class Preference<T> {
    // Equivalent to SharedPreferences.Editor#get….
    T get();

    // Equivalent to SharedPreferences#contains.
    boolean isSet();

    // Equivalent to SharedPreferences.Editor#put….
    void set(T);

    // Equivalent to SharedPreferences.Editor#remove.
    void delete();
}

BYOA

SharedPreferences restricts you to a set of limited types — boolean, float, int, long, String and Set<String>. Trying to persist custom types is doable, but looks awkward.

@Inject SharedPreferences preferences;

// Gets unwieldy when repeated in 10 different places.
String serialized = preferences.getString("point", null);
if (serialized != null) {
  Point point = Point.parse(serialized);
  preferences.putString("point", point.toString());
}

RxPreferences introduces a pluggable Adapter abstraction. An Adapter can store and retrieve values of an arbitrary type, and consolidates your serialization logic into a single location.

public interface Adapter<T> {
  T get(String key, SharedPreferences preferences);

  void set(String key,  T value, Editor editor);
}

RxPreferences provides built in adapters for all the types suppored by SharedPreferences and enums. Writing a custom adapter is trivial. You can even use your own favorite serialization library!

class GsonPreferenceAdapter<T> implements Adapter<T> {
  final Gson gson;
  private Class<T> clazz;

  // Constructor and exception handling omitted for brevity.

  @Override
  public T get(String key, SharedPreferences preferences) {
    return gson.fromJson(preferences.getString(key), clazz);
  }

  @Override
  public void set(String key, T value, Editor editor) {
    editor.putString(key, gson.toJson(value));
  }
}

Then, simply let RxPreferences know which adapter you want to use.

GsonPreferenceAdapter<Point> adapter
    = new GsonPreferenceAdapter<>(gson, Point.class);
Preference<Point> pref = rxPrefs.getObject("point", null, adapter);

// Easy Peasy!
Point point = pref.get();
pref.set(point);

Reactive Bindings

OnSharedPreferenceChangeListener requires that listeners observe changes to all keys. Callers must filter values for the keys they’re interested in.

@Inject SharedPreferences prefs;

prefs.registerOnSharedPreferenceChangeListener((prefs, key) -> {
  // This is a firehose of information!
  // Ignore keys we aren't interested in.
  if !FOO_KEY.equals(key) return;

   boolean foo = prefs.getBoolean(key, false);
   System.out.println(foo);
});

The Preference class integrates with RxJava, and lets you observe changes to a single preference directly. Internally, RxPreferences shares a single listener amongst all Preference objects to avoid unnecessary work.

@Inject @FooPreference BooleanPreference fooPreference;

fooPreference.asObservable()
  .subscribe((enabled) -> System.out.println(enabled));

SharedPreferences+++

RxPreferences also lets you take actions on preferences to update or delete values. This makes it straightforward to set up complex pipelines by combining it with other libraries in the RxJava family.

For example, RxPreferences and RxBinding can be combined to hand roll your own simplified CheckBoxPreference.

@Inject @LocationPreference BooleanPreference locationPreference;
@BindView(R.id.check_box) CheckBox checkBox;

// Update the checkbox when the preference changes.
locationPreference.asObservable()
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(RxCompoundButton.checked(checkBox));

// Update preference when the checkbox state changes.
RxCompoundButton.checkedChanges(checkBox)
  .skip(1) // Skip the initial value.
  .subscribe(locationPreference.asAction());

RxPreferences v1

RxPreferences makes it convenient to interact with SharedPreferences, and integrating with RxJava makes it easy to express complex logic that would otherwise have been tedious and brittle. RxPreferences is available on Maven Central. Check the Github repo or u2020 to see more examples.

Happy persisting!

Thanks to Jake Wharton for polishing the API, and to Diana Smith for reading drafts of this post.

Cleaning up subscriptions

When you create a new Observable, you’ll often want to clean up whenever a Subscriber unsubscribes from the Observable. The Subscriber class exposes a handy add method that does exactly what we want.

For instance, here’s how you would create an Observable that emits the keys that were changed for some SharedPreferences. It registers a new OnSharedPreferenceChangeListener when a subscriber subscribes, and automatically unregisters the listener when the subscriber unsubscribes.

Observable.create((subscriber) -> {
        final OnSharedPreferenceChangeListener listener =
            (preferences, key) -> {
              subscriber.onNext(key);
            };

        preferences.registerOnSharedPreferenceChangeListener(listener);

        subscriber.add(Subscriptions.create(() -> {
            preferences.unregisterOnSharedPreferenceChangeListener(listener);
        }));
    });

On the Observer side of things, you can also enqueue actions to be executed each time a Subscriber unsubscribes from an Observable with the doOnUnSubscribe method.