lauantai 19. maaliskuuta 2016

Saving page loads to InfluxDB through NATS and showing it as a graph with c3.js

I wanted to try out updating graphs in real time, and at the same time try something new. I’ve heard good things about Nats and InfluxDB so these were my preferred choises. I already have a web site www.uutispuro.fi, which I'm also using.

Stack used

  • c3.js - made on top of d3.js, a javascript library for graphs
  • Nats - A fast message broker
  • InfluxDB - Time series database

Installing

First installing Nats and InfluxDB were really easy. Download Nats binary and run it. For InfluxDB there are apt-get installation instructions at https://influxdata.com/downloads.

The code

To get the data to the database, I first needed to send the messages for each page load to Nats. The client is easy to use. For data to be saved I chose the ip address of the server so that I can make own graphs for each of them in the future. URI is the path which was asked, this can be used to show the pages which are read the most. User agent is also an interesting field of information. Time is used for the time based query from the database.

uri := "\"uri\":\"" + c.Request().URI() + "\""
userAgent := "\"userAgent\":\"" + c.Request().Header().Get("user-agent") + "\""
localIP := "\"localIP\":\"" + nats.LocalIP + "\""
time := "\"time\":\"" + time.Now().UTC().String() + "\""
nats.NatsConn.Publish("click", []byte("{"+time+", "+uri+", "+userAgent+", "+localIP+"}"))
if err := next.Handle(c); err != nil {
  c.Error(err)
}

And then in plotter project, I need to listen to Nats.

func GetClicks() {
  NewInflux()
  nc, err := nats.Connect("nats://192.168.0.5:4222", 
    nats.MaxReconnects(60), 
    nats.ReconnectWait(2*time.Second))
  if err != nil {
      fmt.Println(err)
    return
  }
  defer nc.Close()
  ch := make(chan *nats.Msg, 128)
  _, err = nc.ChanSubscribe("click", ch)
  if err != nil {
    fmt.Println(err)
    return
  }

  for {
    msg := <-ch
    var data map[string]interface{}
    if err := json.Unmarshal(msg.Data, &data); err != nil {
      panic(err)
    }
    Save(data)
  }
}

And save the data to the database.

func Save(click map[string]interface{}) {
  bp, err := client.NewBatchPoints(client.BatchPointsConfig{
    Database:  influx.DbName,
    Precision: "s",
  })
  if err != nil {
    log.Println(err)
  }

  stamp, _ := time.Parse("2006-01-02 15:04:05 -0700 MST", click["time"].(string))
  pt, err := client.NewPoint("click", map[string]string{}, click, stamp)
  if err != nil {
    log.Println(err)
  }
  bp.AddPoint(pt)
  err = influx.Conn.Write(bp)
  if err != nil {
    log.Println(err)
  }
}

Now to the exiting part. We have the data in the database and it's easy to fetch it from there.

func GetList() map[string]int64 {
  q := client.NewQuery("select count(localIP) from click 
      where time > now() - 1h 
      group by time(1m) fill(0)",
      "plotter", "s")
  var fields = make(map[string]int64)
  response, err := influx.Conn.Query(q)
  if err != nil || response.Error() != nil {
    fmt.Println(err)
  } else {
    for _, result := range response.Results {
      for _, value := range result.Series[0].Values {
        id := strconv.FormatInt(convertType(value[0]), 10)
        val := convertType(value[1])
        fields[id] = val
      }
    }
  }
  return fields
}

What do we do with the data? We'll send it with websockets to the browser every second.

e.Get("/ws", standard.WrapHandler(websocket.Handler(func(ws *websocket.Conn) {
  for range time.Tick(time.Second) {
    clicks, err := json.Marshal(app.GetList())
      if err != nil {
        fmt.Println(err)
      }
      err = websocket.Message.Send(ws, string(clicks))
      if err != nil {
        fmt.Printf("client closed connection %s\n", err)
        break
      }
    }
})))

In the browser we take a websocket connection and listen.

window.onload = function() {
  writeState("document loaded");
  if ('WebSocket' in window) {
    var connection = new WebSocket('ws://uutispuro.fi:7000/ws');
    connection.onopen = function() {
      writeState("connection open");
    }
    connection.onclose = function() {
      writeState("connection closed");
    }
    connection.onerror = function(error) {
      writeState("error: " + error);
    }
    connection.onmessage = function(e) {
      var msg = JSON.parse(e.data);
      writeState("got a message");
      connection.send("ack");
      arr = ['per minute'];
      Object.keys(msg).map(function(key, i) {
        if (i == 0) {
          arr.push(0);
        } else {
          arr.push(msg[key]);
        }
      });
      chart.load({
        columns: [
          arr
        ]
      });
    }
  } else {
    writeState("You're browser does not support websockets.");
  }
}

And draw the graph with the data sent from the server.

var chart = c3.generate({
  data: {
    columns: [
      ['per minute', 0],
    ],
    type: 'bar'
  },
  bar: {
    width: {
      ratio: 1.0
    }
  },
  axis: {
    x: {
      label: 'minutes',
      inverted: true
    },
    y: {
      label: 'page loads'
    }
  }
});

Links

sunnuntai 6. maaliskuuta 2016

Simple Go balancer for Kubernetes Raspberry Pi cluster

I made a really simple load balancer/proxy in front of Kubernetes to share traffic between Raspberry Pi's. There are many of these available and a lot better ones too, but I wanted to try out myself what is required to make a load balancer. Previous post was about setting up the cluster.

The load balancer has only two methods, one for socket.io and one for everything else.

http.HandleFunc("/socket.io/", websocketProxy)
http.HandleFunc("/", proxy)

Optimally there would only be one method to handle both, but Socket.io with websocket upgrades needs something more. Also I would have liked to log more about Socket.io traffic, but don't know how (Is it even possible?).

Sharing traffic between addresses is done randomly

func random(min, max int) int {
  return rand.Intn(max-min) + min
}
and usage:
urls[random(0, 3)]

It's in use now, but far from ready. First of all, I'd like to get the addresses from Kubernetes automatically. For now, it's relying on an environment variable:

export BALANCED_URLS=http://[192.168.0.21]:1300,http://[192.168.0.22]:1300,http://[192.168.0.23]:1300

Secondly if and when one of the endpoints is down, balancer should not try to use that endpoint at all. I also don't have any idea how much traffic it will handle, that would be nice to know :)

Github: https://github.com/jelinden/go-loadbalancer

lauantai 5. maaliskuuta 2016

Raspberry Pi cluster with Kubernetes

I got my first Raspberry Pi as a gift when I was an a Architecting on AWS -course. I’ve been mostly just playing with it, but wanted to use it for something useful. Then I read a blog post about making a cluster out of them and got really interested.

Creating a Raspberry Pi cluster running Kubernetes, the shopping list (Part 1) and Creating a Raspberry Pi cluster running Kubernetes, the installation (Part 2).

I ordered three (so I have one less than in the original recipe) more Raspberrys and started making a cluster. Initially I was thinking about some new database cluster but then changed to scaling Uutispuro, a rss feed title lister that I’ve been making for quite some time now. It uses Mongodb from outside of the Kubernetes.

Making the actual cluster went nicely with the help of the blog posts I followed. I did also setup ntpd and used fi_FI.UTF-8 as a locale for each Pi. Each worker node connected nicely to master, only the last one had a hickup of some kind (it got stuck to “NotReady”) but restart helped.

Docker Hub

It was a surprise for me that I had to use Docker Hub for getting the docker image to Kubernetes. At least it should be possible to use the image straigth away that I'm making.

# build a docker image
docker build -t jelinden/newsfeedreader:0.2 .
# push to docker hub
docker push jelinden/newsfeedreader:0.2
# get the image from docker hub and expose port 1300 and run it in one of the Pi's
kubectl run newsfeedreader --image=docker.io/jelinden/newsfeedreader:0.2 --port=1300
You can view what's going on inside the cluster with
kubectl get events -w
And then you can scale it up to run on three instances
kubectl scale rc newsfeedreader --replicas=3

I couldn’t get Docker Hub to work with a private repo, you can read more about it at Kubernetes PullImageError using Docker Hub with a private image. Luckily for me it doesn't matter if it's public or private.

What next?

Worker node ports are only accessible to master node, so I have to add a load balancer to the setup. Adding a haproxy or nginx is too easy, so I will make my own with Go.