REST Web Service in Go - Encapsulating a Worker Pool

I wanted to make performance improvements in a PHP application recently. One section of code was doing a lot of number crunching and performance was just not great. I looked at creating a PHP extension in C to do the calculations. This might have helped but the thought of working in C felt a bit daunting. I told myself that this would be a good opportunity to explore Go a little further by trying to solve this issue with goroutines.

I had been reading that Go has great support for concurrency. From what I was reading, the concepts around goroutines and channels make it easy for a beginner to write concurrent programs. With that in mind, I gave Go a shot. The plan was to create a web service that would take in a list of input values and spit out the calculations. The calculations would be done concurrently by goroutines. Results would be collected as they were ready and sent off as JSON once all goroutines were done. This would be exposed as a REST endpoint that our PHP code would call.

I didn’t know if creating a web service to do concurrent calculations would make a difference in performance. Boy was I ever nicely surprised. The performance increase was phenomenal. The added overhead of the web service call in PHP to the REST endpoint was negligible.

In this blog post, I’ll write a very simple the Go web service that does concurrent calculations and returns a set of results as JSON. I’ll also create a PHP app that will call the Go web service using Guzzle and print out the results. I hope that this post will be a good starting point for readers exploring performance improvement options one can make in existing applications by using Go to do some of the heavy lifting. I have had great results in my own context using the approach outlined in this post. However, please note that I don’t have a whole lot of experience in Go at this point. Readers should judge for themselves whether the approach is a good fit for their context.

The REST Endpoint

The first thing we’re going to do is write the REST endpoint for our service. We’ll need to deal with routing requests. Specifically, we need to match requested URLs against a list of known patterns and call handlers for these patterns. For this, we’ll use the gorilla/mux package from the Gorilla web toolkit. This router is overkill for this particular example. We’re just going to route requests based on exact URL matches. I like to use gorilla/mux it because it’s very flexible in terms of the options available to you for matching routes to handlers. The standard http.ServerMux is good but requires a lot more work to do any fancy routing.

To install gorilla/mux you have to type the following command :


go get github.com/gorilla/mux

To use gorilla/mux in your code you have to import it like so :


import (
    "github.com/gorilla/mux"
)

For the example service, we’ll create a server to handle requests made at the http://localhost:8080/calcs URL. Our initial source code looks like this :


package main

import (
    "io"
    "log"
    "net/http"
    "github.com/gorilla/mux"
    "strconv"
)

//CalcsHandler is a struct that implements the http.Handler interface
type CalcsHandler struct {}

//ServeHTTP method is bound to the CalcsHandler struct. 
//Implementing the http.Handler interface
func (handler *CalcsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    io.WriteString(w, "I will perform calculations for you...")
}


func main() {
    //The port our server will be litening on
    port := 8080 

    //Our fancy Gorilla mux router
    router := mux.NewRouter() 

    //Our Routes : 
    //For now, will route all (GET,POST,etc) requests on /calcs URL to 
    // CalcsHandler's ServeHTTP function
    router.Handle("/calcs", &CalcsHandler{})  


    log.Printf("Starting server. Listening on port %d", port)
    err := http.ListenAndServe(":" + strconv.Itoa(port), router)
    if err != nil {
        log.Fatal("ListenAndServe: ", err)
    }
}

There is not much to say about this program. The important parts are commented in the source. The one thing to note is that we’re using the Gorilla mux router and we have a CalcsHandler struct that implements the http.Handler interface. All requests made to the /calcs URL will end up firing the CalcsHandler’s ServeHTTP method.

We’re going to need to process the JSON that is posted to this enpoint. We’re only really interested in POST requests so let’s change one line in our application in order to limit the request type to POST.

Adding the .Methods("POST") line in our routing code will ensure that the server only responds to POST requests on that URL.


router.Handle("/calcs", &CalcsHandler{}).Methods("POST")  

Note that if you hit the server with a GET request at the /calcs URL after making the above change, the server should respond with a 404 error.

We’re now ready to accept some JSON on this endpoint. Our simulated web service will accept a list integer pairs. It will add these up individually and spit out the results as JSON along with the original submitted integer pairs.

The input data for the service will look like this :


{
    "table": [
        {
            "Index": 0,
            "NumA": 1,
            "NumB": 2
        },
        {
            "Index": 1,
            "NumA": 3,
            "NumB": 4
        }
    ]
}

The response will look like this :


[
    {
        "Index": 0,
        "NumA": 1,
        "NumB": 2,
        "Result": 3
    },
    {
        "Index": 1,
        "NumA": 3,
        "NumB": 4,
        "Result": 7
    }
]

Notice the "Result" : xx field and the original input data is included our service’s response.

Let’s deal with processing the submitted JSON first. We’ll do the concurrent calculations later. To process the JSON, we have to change our CalcsHandler’s ServeHTTP method. But first we’ll need a few structs to represent the data. We will use one struct to represent the input and another to represent the output. We will create these structs in a separate package we’ll call domain. Our domain package structs will look like :


package domain

type input struct{
    Index   int
    NumA    int64
    NumB    int64
}

type output struct{
    Index   int
    NumA    int64
    NumB    int64
    Result  int64
}

The package will also have functions to process the input and generate the output JSON. The complete domain package will look like this :


package domain

import (
    "encoding/json"
    "sort"
    "log"
)

type Input struct{
    Index   int
    NumA    int64
    NumB    int64
}

type Output struct{
    Index   int
    NumA    int64
    NumB    int64
    Result  int64
}

//For more information on marshelling, see :
//-  http://mattyjwilliams.blogspot.ca/2013/01/using-go-to-unmarshal-json-lists-with.html
// Takes a byte slice and generates an Input slice
func ProcessJsonInput(inputData []byte) []Input {
    collection := []Input{}
    var data map[string][]json.RawMessage
    err := json.Unmarshal(inputData, &data)
    if err != nil {
        log.Println(err)
        return collection
    }
    for _, thing := range data["table"] {
        collection = addInput(thing, collection)

    }
    return collection
}

//The results of the workers' calculations are accumulated in a map. 
//We generate our JSON result with this map. Before we can send 
//it back as a marshalled map of Output structs, we want to sort it by key. 
//The key for the map is the Index
func GenerateJsonOutput(output map[int]Output) ([]byte, error) {
    //The Output is not sorted by index.
    //We sort it by Index prior to returning the response
    sorted := make([]Output, len(output))
    keys := make([]int, len(output))

    for k, _ := range output {
        keys = append(keys, k)
    }
    sort.Ints(keys)

    for _, k := range keys {
        sorted[k] = output[k]
    }
    return json.Marshal(sorted)
}

//Takes a json.RawMessage, converts it to an Input stuct 
//and adds it to our collection
func addInput(thing json.RawMessage, collection []Input) []Input {
    input := Input{}
    err := json.Unmarshal(thing, &input)

    if err != nil {
        log.Println(err)        
    } else {
        if input != *new(Input) {
            collection = append(collection, input)
        }
    }

    return collection
}

The important thing to note from the code above is that we’ve added two functions that are available to the outside world. The first one is ProcessJsonInput and the second one is GenerateJsonOutput. These functions are responsible for unmarshalling and marshalling our input and output.

We can now use this new domain package in our main package. The CalcsHandler’s ServeHTTP method will use it to process the posted input data and generate output data. For now, we’ll just take in some data post back some results without doing any calculations. After we’ve done a little refactoring, our main package code will look like the following :


package main

import (
    "io"
    "log"
    "net/http"
    "github.com/gorilla/mux"
    "strconv"
    domain "github.com/redsofa/worker-pool-rest/domain"
    "io/ioutil"
    "fmt"
)

//CalcsHandler is a struct that implements the http.Handler interface
type CalcsHandler struct {}

// ServeHTTP method is bound to the CalcsHandler struct. 
// Implementing the http.Handler interface
func (handler *CalcsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    // For more information on making Making a RESTful JSON API's in Go see:
    // - http://thenewstack.io/make-a-restful-json-api-go/

    //The io.LimitReader function is to protect again malicious attacks on the server
    //Prevents from having to deal with very large files...

    // LimitReader returns a Reader that reads from r but stops with EOF after
    // n bytes. We limit this to 2 MB        
    body, err := ioutil.ReadAll(io.LimitReader(r.Body, 2048666))
    if err != nil {
        log.Println(err)
        panic(err)
    }
    if err := r.Body.Close(); err != nil {
        log.Println(err)
        panic(err)
    }

    //The input list that the user submitted to the server
    //The ProcessJsonInput function returns a slice Inputs
    inputCollection := domain.ProcessJsonInput(body)
    //Make sure the user submitted something...
    if len(inputCollection) <= 0 {
        http.Error(w, "{\"error\": \"empty input list submitted.\"}", 500)
        return
    }

    //The res map will be used to collect results of our workers...
    res := make(map[int]domain.Output)

    //For now we'll just fake the process that does the concurrent calculations
    //and return 0s for the results
    for _, v := range inputCollection {
       output := domain.Output{v.Index, v.NumA, v.NumB, 0}

       res[v.Index] = output
    }

    //Set response header information
    w.Header().Set("Content-Type", "application/json; charset=UTF-8")
    w.WriteHeader(http.StatusCreated)

    //Create json with all results...
    response, err := domain.GenerateJsonOutput(res)
    if err != nil {
        log.Println(err)
        panic(err)
    }

    fmt.Fprintf(w, string(response))
}


func main() {
    //The port our server will be litening on
    port := 8080 

    //Our fancy Gorilla mux router
    router := mux.NewRouter() 

    //Our Routes : 
    //We're only interested in POST requests on /calcs URL
    router.Handle("/calcs", &CalcsHandler{}).Methods("POST")  


    log.Printf("Starting server. Listening on port %d", port)
    err := http.ListenAndServe(":" + strconv.Itoa(port), router)
    if err != nil {
        log.Fatal("ListenAndServe: ", err)
    }
}

The important parts of the code above are commented. The thing to note is that we’re now importing our new package and using it. See import statement : domain "github.com/redsofa/worker-pool-rest/domain". Our CalcsHandler’s ServeHTTP method now makes use of the ProcessJsonInput() and GenerateJsonOutput() functions to process the input and generate some simulated output.

At this point we can start up the server and post data to it. As a result, we should get a response that doesn’t have any real calculation results in it. The service doesn’t do anything useful at the moment but at least we can do the round trip of posting and receiving results. To test the service you can use something like this curl command :


curl \
--request POST \
--header "Content-Type: application/json" \
--data '
{
  "table": [
        { "Index": 0, "NumA": 1, "NumB": 2 },
        { "Index": 1, "NumA": 3,"NumB": 4}
    ]
}
' \
http://localhost:8080/calcs

The server’s response should look like the following :


[{"Index":0,"NumA":1,"NumB":2,"Result":0},{"Index":1,"NumA":3,"NumB":4,"Result":0}]

Now that we interact with our server with a send and receive of data data, let’s implement the concurrent calculations.

The Worker Pool

What we’re going to do next is integrate the worker pools pattern, found at the excellent Go by Example web site here, into our web service. The worker pool example uses two channels : One to schedule jobs and another to post job results to. We’ll be using this mechanism to control our jobs and results. Our channels will send Input and Output types on them.

We’re first going add a NewWorker() function to our domain package which will range over jobs, perform a calculation once there is data on this channel and publish the results to the results channel once the calculations are complete. The NewWorker() function look like this :


// Modeled off of :
// - https://gobyexample.com/worker-pools
//
// We will run several cocurrent instances of the worker
// - job chanel is read only for Input
// - results chanel is write only for Output
func NewWorker(jobs <-chan Input, results chan<- Output) {
    for input := range jobs {
        
        sum := input.NumA + input.NumB

        r := Output{
            Index:      input.Index,
            NumA:       input.NumA,
            NumB:       input.NumB,
            Result:     sum,
        }
        results <- r
    }
}

One thing to note in the code above is that the jobs channel has the input data and the results channel has the output data. The original input data is added to the Output with the sum calculation and sent on the results channel.

Now that we’ve implemented the calculations, we’re going to use this in the CalcsHandler’s ServeHTTP method and launch goroutines to do the calculations.

Here’s what the new ServeHTTP method looks like :


// ServeHTTP method is bound to the CalcsHandler struct. 
// Implementing the http.Handler interface
func (handler *CalcsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    // For more information on making Making a RESTful JSON API's in Go see:
    // - http://thenewstack.io/make-a-restful-json-api-go/

    //The io.LimitReader function is to protect again malicious attacks on the server
    //Prevents from having to deal with very large files...

    // LimitReader returns a Reader that reads from r but stops with EOF after
    // n bytes. We limit this to 2 MB        
    body, err := ioutil.ReadAll(io.LimitReader(r.Body, 2048666))
    if err != nil {
        log.Println(err)
        panic(err)
    }
    if err := r.Body.Close(); err != nil {
        log.Println(err)
        panic(err)
    }

    //The input list that the user submitted to the server
    //The ProcessJsonInput function returns a slice of Inputs
    inputCollection := domain.ProcessJsonInput(body)
    //Make sure the user submitted something...
    if len(inputCollection) <= 0 {
        http.Error(w, "{\"error\": \"empty input list submitted.\"}", 500)
        return
    }

    //Make a map of Inputs and populated with the submitted data
    inputMap := make(map[int]domain.Input)
    for k, v := range inputCollection {
       inputMap [k] = v
    }

    //The number of jobs we have to execute is the count of elements in our input map
    numberOfJobs := len(inputMap)
    numberOfWorkers := 100

    //Buffered chanels
    jobs := make(chan domain.Input, 1000)    //Chanel to send in the jobs
    results := make(chan domain.Output, 1000) //Chanel to receive job results

    for w := 1; w <= numberOfWorkers; w++ {
        go domain.NewWorker(jobs, results)
    }

    for j := 0; j <= numberOfJobs-1; j++ {
        job := inputMap[j]
        jobs <- job
    }

    //close the jobs chanel indicating that this is all the 
    //work we have
    close(jobs)

    //The res map will be used to collect results of our workers...
    res := make(map[int]domain.Output)

    //Collect worker results
    for a := 0; a <= numberOfJobs-1; a++ {
        r := <-results
        res [r.Index] = r
    }
    close(results)

    //Set response header information
    w.Header().Set("Content-Type", "application/json; charset=UTF-8")
    w.WriteHeader(http.StatusCreated)

    //Create json with all results...
    response, err := domain.GenerateJsonOutput(res)
    if err != nil {
        log.Println(err)
        panic(err)
    }

    fmt.Fprintf(w, string(response))
}

The important part of the code above is the go domain.NewWorker(jobs, results) line which starts a calculation in a goroutine. The number of goroutines we launch is controlled by the numberOfWorkers variable. These goroutines will use the jobs and results channels to get new jobs and return results.

Calling the service with curl like this :


curl \
--request POST \
--header "Content-Type: application/json" \
--data '
{
  "table": [
        { "Index": 0, "NumA": 1, "NumB": 2 },
        { "Index": 1, "NumA": 3,"NumB": 4}
    ]
}
' \
http://localhost:8080/calcs

Should give us the following response now. Notice that we now have a calculation result now for each element in our array.


[{"Index":0,"NumA":1,"NumB":2,"Result":3},{"Index":1,"NumA":3,"NumB":4,"Result":7}]

The Guzzle Client

This last section shows how you can call the calculation web service from PHP using the Guzzle client. The PHP code below is very simple but illustrates the important parts of posting JSON to our service and using the results. In the real world you might see some HTML, perhaps a table, be generated with the web service call results.



require 'vendor/autoload.php';
use GuzzleHttp\Client;

$json = '{"table": [{ "Index": 0, "NumA": 1, "NumB": 2 },{ "Index": 1, "NumA": 3,"NumB": 4}]}';

$client = new Client();
$url = 'http://localhost:8080/calcs';

//post json to server
$response = $client->post($url, ['body' => $json]);

//echo the response
echo ($response->getBody());

Conclusion

There you have it: A web service written in Go that performs concurrent calculations. In addition, the service is called from a PHP application. This post was fun to write. I hope that what I’ve learned in Go this week is useful to someone.

Source files for examples in this post : GitHub Repository

comments powered by Disqus