lauantai 19. maaliskuuta 2016

Saving page loads to InfluxDB through NATS and showing it as a graph with c3.js

I wanted to try out updating graphs in real time, and at the same time try something new. I’ve heard good things about Nats and InfluxDB so these were my preferred choises. I already have a web site www.uutispuro.fi, which I'm also using.

Stack used

  • c3.js - made on top of d3.js, a javascript library for graphs
  • Nats - A fast message broker
  • InfluxDB - Time series database

Installing

First installing Nats and InfluxDB were really easy. Download Nats binary and run it. For InfluxDB there are apt-get installation instructions at https://influxdata.com/downloads.

The code

To get the data to the database, I first needed to send the messages for each page load to Nats. The client is easy to use. For data to be saved I chose the ip address of the server so that I can make own graphs for each of them in the future. URI is the path which was asked, this can be used to show the pages which are read the most. User agent is also an interesting field of information. Time is used for the time based query from the database.

uri := "\"uri\":\"" + c.Request().URI() + "\""
userAgent := "\"userAgent\":\"" + c.Request().Header().Get("user-agent") + "\""
localIP := "\"localIP\":\"" + nats.LocalIP + "\""
time := "\"time\":\"" + time.Now().UTC().String() + "\""
nats.NatsConn.Publish("click", []byte("{"+time+", "+uri+", "+userAgent+", "+localIP+"}"))
if err := next.Handle(c); err != nil {
  c.Error(err)
}

And then in plotter project, I need to listen to Nats.

func GetClicks() {
  NewInflux()
  nc, err := nats.Connect("nats://192.168.0.5:4222", 
    nats.MaxReconnects(60), 
    nats.ReconnectWait(2*time.Second))
  if err != nil {
      fmt.Println(err)
    return
  }
  defer nc.Close()
  ch := make(chan *nats.Msg, 128)
  _, err = nc.ChanSubscribe("click", ch)
  if err != nil {
    fmt.Println(err)
    return
  }

  for {
    msg := <-ch
    var data map[string]interface{}
    if err := json.Unmarshal(msg.Data, &data); err != nil {
      panic(err)
    }
    Save(data)
  }
}

And save the data to the database.

func Save(click map[string]interface{}) {
  bp, err := client.NewBatchPoints(client.BatchPointsConfig{
    Database:  influx.DbName,
    Precision: "s",
  })
  if err != nil {
    log.Println(err)
  }

  stamp, _ := time.Parse("2006-01-02 15:04:05 -0700 MST", click["time"].(string))
  pt, err := client.NewPoint("click", map[string]string{}, click, stamp)
  if err != nil {
    log.Println(err)
  }
  bp.AddPoint(pt)
  err = influx.Conn.Write(bp)
  if err != nil {
    log.Println(err)
  }
}

Now to the exiting part. We have the data in the database and it's easy to fetch it from there.

func GetList() map[string]int64 {
  q := client.NewQuery("select count(localIP) from click 
      where time > now() - 1h 
      group by time(1m) fill(0)",
      "plotter", "s")
  var fields = make(map[string]int64)
  response, err := influx.Conn.Query(q)
  if err != nil || response.Error() != nil {
    fmt.Println(err)
  } else {
    for _, result := range response.Results {
      for _, value := range result.Series[0].Values {
        id := strconv.FormatInt(convertType(value[0]), 10)
        val := convertType(value[1])
        fields[id] = val
      }
    }
  }
  return fields
}

What do we do with the data? We'll send it with websockets to the browser every second.

e.Get("/ws", standard.WrapHandler(websocket.Handler(func(ws *websocket.Conn) {
  for range time.Tick(time.Second) {
    clicks, err := json.Marshal(app.GetList())
      if err != nil {
        fmt.Println(err)
      }
      err = websocket.Message.Send(ws, string(clicks))
      if err != nil {
        fmt.Printf("client closed connection %s\n", err)
        break
      }
    }
})))

In the browser we take a websocket connection and listen.

window.onload = function() {
  writeState("document loaded");
  if ('WebSocket' in window) {
    var connection = new WebSocket('ws://uutispuro.fi:7000/ws');
    connection.onopen = function() {
      writeState("connection open");
    }
    connection.onclose = function() {
      writeState("connection closed");
    }
    connection.onerror = function(error) {
      writeState("error: " + error);
    }
    connection.onmessage = function(e) {
      var msg = JSON.parse(e.data);
      writeState("got a message");
      connection.send("ack");
      arr = ['per minute'];
      Object.keys(msg).map(function(key, i) {
        if (i == 0) {
          arr.push(0);
        } else {
          arr.push(msg[key]);
        }
      });
      chart.load({
        columns: [
          arr
        ]
      });
    }
  } else {
    writeState("You're browser does not support websockets.");
  }
}

And draw the graph with the data sent from the server.

var chart = c3.generate({
  data: {
    columns: [
      ['per minute', 0],
    ],
    type: 'bar'
  },
  bar: {
    width: {
      ratio: 1.0
    }
  },
  axis: {
    x: {
      label: 'minutes',
      inverted: true
    },
    y: {
      label: 'page loads'
    }
  }
});

Links

Ei kommentteja:

Lähetä kommentti

Huomaa: vain tämän blogin jäsen voi lisätä kommentin.