Multiple changes. v0.3.0
authorMahlon E. Smith <mahlon@martini.nu>
Sat, 25 Jul 2020 15:05:26 -0700
changeset 19 1f09cfb560e0
parent 18 a135fdaed52b
child 20 8ab012d41a9c
Multiple changes. - Clean up various nim compiler warnings. ... except ObservableStores. https://forum.nim-lang.org/t/6442#39738 - Update documentation for Netdata v1.23's "exporting" module. - TCP connections to netdata where dropped by default. Expose this behavior as a toggle, and change the default to leave the child process (and the tcp socket) open. - Bump to v0.3.0.
Makefile
README.md
netdata_tsrelay.nim
--- a/Makefile	Mon Nov 19 12:05:51 2018 -0800
+++ b/Makefile	Sat Jul 25 15:05:26 2020 -0700
@@ -3,12 +3,12 @@
 
 default: release
 
-debug: ${FILES}
-	nim --assertions:on --nimcache:.cache c ${FILES}
+autobuild:
+	find . -type f -iname \*.nim | entr -c make development
 
 development: ${FILES}
 	# can use gdb with this...
-	nim --debugInfo --linedir:on -d:testing -d:nimTypeNames --nimcache:.cache c ${FILES}
+	nim --debugInfo --assertions:on --linedir:on -d:testing -d:nimTypeNames --nimcache:.cache c ${FILES}
 
 debugger: ${FILES}
 	nim --debugger:on --nimcache:.cache c ${FILES}
--- a/README.md	Mon Nov 19 12:05:51 2018 -0800
+++ b/README.md	Sat Jul 25 15:05:26 2020 -0700
@@ -58,20 +58,26 @@
 ### Netdata
 
 You'll likely want to pare down what netdata is sending.  Here's an
-example configuration for `netdata.conf` -- season this to taste (what
+example configuration for `exporting.conf` -- season this to taste (what
 charts to send and frequency.)
 
+Note: This example uses the "exporting" module introduced in
+Netdata v1.23.  If your netdata is older than that, you'll be using
+the deprecated "backend" instead in the main `netdata.conf` file.
+
 ```
-[backend]
-    hostname           = your-hostname
-    enabled            = yes
-    type               = json
-    data source        = average
-    destination        = machine-where-netdata-tsrelay-lives:14866
-    prefix             = n
-    update every       = 60
-    buffer on failures = 5
-    send charts matching = !cpu.cpu* !ipv6* !users* nfs.rpc net.* net_drops.* net_packets.* !system.interrupts* system.* disk.* disk_space.* disk_ops.* mem.*
+[exporting:global]
+	enabled = yes
+
+[json:timescale]
+	hostname             = your-hostname
+	enabled              = yes
+	data source          = average
+	destination          = localhost:14866
+	prefix               = netdata
+	update every         = 10
+	buffer on failures   = 10
+	send charts matching = !cpu.cpu* !ipv6* !users.* nfs.rpc net.* net_drops.* net_packets.* !system.interrupts* system.* disk.* disk_space.* disk_ops.* mem.*
 ```
 
 
@@ -82,17 +88,24 @@
 
   * [-q|--quiet]:    Quiet mode.  No output at all. Ignored if -d is supplied.
   * [-d|--debug]:    Debug mode.  Show incoming data.
-  * [--dbopts]:      PostgreSQL connection information.  (See below for more details.)
+  * [-D|--dropconn]: Drop the TCP connection to netdata between samples.
+                     This may be more efficient depending on your environment and
+                     number of clients.  Defaults to false.
+  * [-o|--dbopts]:   PostgreSQL connection information.  (See below for more details.)
   * [-h|--help]:     Display quick help text.
-  * [--listen-addr]: A specific IP address to listen on.  Defaults to **INADDR_ANY**.
-  * [--listen-port]: The port to listen for netdata JSON streams.
+  * [-a|--listen-addr]: A specific IP address to listen on.  Defaults to **INADDR_ANY**.
+  * [-p|--listen-port]: The port to listen for netdata JSON streams.
                      Default is **14866**.
+  * [-P|--persistent]: Don't disconnect from the database between samples. This may be
+                     more efficient with a small number of clients, when not using a
+                     pooler, or with a very high sample size/rate.  Defaults to false.
   * [-T|--dbtable]:  Change the table name to insert to.  Defaults to **netdata**.
   * [-t|--timeout]:  Maximum time in milliseconds to wait for data.  Slow
                      connections may need to increase this from the default **500** ms.
   * [-v|--version]:  Show version.
 
 
+
 **Notes**
 
 Nim option parsing might be slightly different than what you're used to.
@@ -112,10 +125,9 @@
 ... which uses the default PostgreSQL port, and connects as the running
 user.
 
-Reference the [PostgreSQL
-Documentation](https://www.postgresql.org/docs/current/static/libpq-conn
-ect.html#LIBPQ-PARAMKEYWORDS) for all available options (including how
-to store passwords in a separate file, enable SSL mode, etc.)
+Reference the [PostgreSQL Documentation](https://www.postgresql.org/docs/current/static/libpq-connect.html#LIBPQ-PARAMKEYWORDS)
+for all available options (including how to store passwords in a
+separate file, enable SSL mode, etc.)
 
 
 ### Daemonizing
--- a/netdata_tsrelay.nim	Mon Nov 19 12:05:51 2018 -0800
+++ b/netdata_tsrelay.nim	Sat Jul 25 15:05:26 2020 -0700
@@ -1,6 +1,6 @@
 # vim: set et nosta sw=4 ts=4 :
 #
-# Copyright (c) 2018, Mahlon E. Smith <mahlon@martini.nu>
+# Copyright (c) 2018-2020, Mahlon E. Smith <mahlon@martini.nu>
 # All rights reserved.
 # Redistribution and use in source and binary forms, with or without
 # modification, are permitted provided that the following conditions are met:
@@ -34,29 +34,60 @@
     math,
     nativesockets,
     net,
-    os,
     parseopt,
     posix,
     strutils,
+    strformat,
     tables,
     terminal,
     times
 
 
 const
-    VERSION = "v0.2.0"
+    VERSION = "v0.3.0"
     USAGE = """
-./netdata_tsrelay [-d][-h][-q][-t][-T][-v] --dbopts="[PostgreSQL connection string]" --listen-port=14866 --listen-addr=0.0.0.0
+./netdata_tsrelay [-adDhopqtTv]
+
+  -a --listen-addr:
+    The outbound IP address to listen for netdata streams.
+
+  -d --debug:
+    Debug: Show incoming and parsed data.
+
+  -D --dropconn:
+    Drop the persistent socket to netdata between samples to conserve
+    local resources.  This may be helpful with a large number of clients.
+    Defaults to false.
+
+  -h --help:
+    Help.  You're lookin' at it.
 
-  -q: Quiet mode.  No output at all.  Ignored if -d is supplied.
-  -d: Debug: Show incoming and parsed data.
-  -v: Display version number.
-  -T: Change the destination table name from the default 'netdata'.
-  -t: Alter the maximum time (in ms) an open socket waits for data.  Default: 500ms.
-  -h: Help.  You're lookin' at it.
+  -o --dbopts:
+    The PostgreSQL connection string parameters.
+    The default connection string is:
+      "host=localhost dbname=netdata application_name=netdata-tsrelay"
+
+  -p --listen-port:
+    Change the listening port from the default (14866).
+
+  -P --persistent:
+    Don't disconnect from the database between samples.  This may be
+    more efficient with a small number of clients, when not using a
+    pooler, or with a very high sample size/rate.  Defaults to false.
 
-The default connection string is:
-  "host=localhost dbname=netdata application_name=netdata-tsrelay"
+  -q --quiet:
+    Quiet mode.  No output at all.  Ignored if -d is supplied.
+
+  -T --dbtable:
+    Change the destination table name from the default (netdata).
+
+  -t --timeout:
+    Alter the maximum time (in ms) an open socket waits for data
+    before processing the sample.  Default: 500ms.
+
+  -v --verbose:
+    Display version number.
+
     """
     INSERT_SQL = """
     INSERT INTO $1
@@ -70,6 +101,8 @@
     Config = object of RootObj
         dbopts:      string  # The postgresql connection parameters.  (See https://www.postgresql.org/docs/current/static/libpq-connect.html)
         dbtable:     string  # The name of the table to write to.
+        dropconn:    bool    # Close the TCP connection between samples.
+        persistent:  bool    # Don't close the database handle between samples.
         listen_port: int     # The port to listen for incoming connections.
         listen_addr: string  # The IP address listen for incoming connections.  Defaults to inaddr_any.
         verbose:     bool    # Be informative
@@ -77,6 +110,14 @@
         insertsql:   string  # The SQL insert string after interpolating the table name.
         timeout:     int     # How long to block, waiting on connection data.
 
+
+type
+    NetdataClient = ref object
+        sock: Socket    # The raw socket fd
+        address: string # The remote IP address
+        db: DbConn      # An optionally persistent database handle
+
+
 # Global configuration
 var conf: Config
 
@@ -91,16 +132,25 @@
     result = "\e[" & $color & 'm' & msg & "\e[0m"
 
 
-proc fetch_data( client: Socket ): string =
-    ## Netdata JSON backend doesn't send a length, so we read line by
-    ## line and wait for stream timeout to determine a "sample".
+proc fetch_data( client: NetdataClient ): string =
+    ## Netdata JSON backend doesn't send a length nor a separator
+    ## between samples, so we read line by line and wait for stream
+    ## timeout to determine what constitutes a sample.
     var buf = ""
-    try:
-        while true:
-            client.readline( buf, timeout=conf.timeout )
-            if buf != "": result = result & buf & "\n"
-    except TimeoutError:
-        return
+    while true:
+        try:
+            client.sock.readline( buf, timeout=conf.timeout )
+            if buf == "":
+                if conf.debug: echo "Client {client.address} closed socket.".fmt.hl( fgRed, bright=true )
+                quit( 1 )
+
+            result = result & buf & "\n"
+
+        except OSError:
+            quit( 1 )
+        except TimeoutError:
+            if result == "": continue 
+            return
 
 
 proc parse_data( data: string ): seq[ JsonNode ] =
@@ -147,22 +197,23 @@
         result.add( sample )
 
 
-proc write_to_database( samples: seq[ JsonNode ] ): void =
+proc write_to_database( client: NetdataClient, samples: seq[ JsonNode ] ): void =
     ## Given a sequence of json samples, write them to database.
     if samples.len == 0: return
 
-    let db = open( "", "", "", conf.dbopts )
+    if client.db.isNil:
+        client.db = open( "", "", "", conf.dbopts )
 
     try:
-        db.exec sql( "BEGIN" )
+        client.db.exec sql( "BEGIN" )
         for sample in samples:
             var
                 timestamp = sample[ "timestamp" ].get_int
                 host = sample[ "hostname" ].get_str.to_lowerascii
             sample.delete( "timestamp" )
             sample.delete( "hostname" )
-            db.exec sql( conf.insertsql ), timestamp, host, sample
-        db.exec sql( "COMMIT" )
+            client.db.exec sql( conf.insertsql ), timestamp, host, sample
+        client.db.exec sql( "COMMIT" )
     except:
         let
             e = getCurrentException()
@@ -170,10 +221,12 @@
         echo "Got exception ", repr(e), " while writing to DB: ", msg
         discard
 
-    db.close
+    if not conf.persistent:
+        client.db.close
+        client.db = nil
 
 
-proc process( client: Socket, address: string ): void =
+proc process( client: NetdataClient ): void =
     ## Do the work for a connected client within child process.
     let t0 = cpu_time()
     var raw_data = client.fetch_data
@@ -182,24 +235,25 @@
     # reconnect.  Save local resources/file descriptors
     # by closing after the send is considered complete.
     #
-    try:
-        client.close
-    except OSError:
-        return
+    if conf.dropconn:
+        try:
+            client.sock.close
+        except OSError:
+            return
 
     # Pivot the parsed data to a single JSON blob per sample time.
     var samples = parse_data( raw_data )
-    write_to_database( samples )
+    client.write_to_database( samples )
 
     if conf.verbose:
+        let cputime = cpu_time() - t0
         echo(
             hl( $(epochTime().to_int), fgMagenta, bright=true ),
             " ",
             hl( $(samples.len), fgWhite, bright=true ),
             " sample(s) parsed from ",
-            address.hl( fgYellow, bright=true ),
-            " in ", hl($( round(cpu_time() - t0, 3) ), fgWhite, bright=true), " seconds."
-            # " ", hl($(round((get_occupied_mem()/1024/1024),1)), fgWhite, bright=true), "MB memory used."
+            client.address.hl( fgYellow, bright=true ),
+            " in ", hl( "{cputime:<2.3f}".fmt, fgWhite, bright=true), " seconds."
         )
 
 
@@ -235,19 +289,23 @@
     # Wait for incoming connections, fork for each client.
     #
     while true:
-        var
-            client  = new Socket
-            address = ""
+        let client = NetdataClient.new
+        client.sock = Socket.new
 
         # Block, waiting for new connections.
-        server.acceptAddr( client, address )
+        server.acceptAddr( client.sock, client.address )
 
         if fork() == 0:
             server.close
-            client.process( address )
-            quit( 0 )
+            if conf.dropconn:
+                # "one shot" mode.
+                client.process
+                quit( 0 )
+            else:
+                # Keep the connection to netdata open.
+                while true: client.process
 
-        client.close
+        client.sock.close
         when defined( testing ): dumpNumberOfInstances()
 
 
@@ -259,11 +317,13 @@
     result = Config(
         dbopts: "host=localhost dbname=netdata application_name=netdata-tsrelay",
         dbtable: "netdata",
+        dropconn: false,
         listen_port: 14866,
         listen_addr: "0.0.0.0",
         verbose: true,
         debug: false,
         timeout: 500,
+        persistent: false,
         insertsql: INSERT_SQL % [ "netdata" ]
     )
 
@@ -281,6 +341,12 @@
                 of "debug", "d":
                     result.debug = true
 
+                of "dropconn", "D":
+                    if result.persistent:
+                        echo "Dropping TCP sockets are incompatible with persistent database connections."
+                        quit( 1 )
+                    result.dropconn = true
+
                 of "help", "h":
                     echo USAGE
                     quit( 0 )
@@ -296,11 +362,17 @@
 
                 of "dbtable", "T":
                     result.insertsql = INSERT_SQL % [ val ]
-                of "dbopts": result.dbopts = val
+                of "dbopts", "o": result.dbopts = val
 
                 of "listen-addr", "a": result.listen_addr = val
                 of "listen-port", "p": result.listen_port = val.parse_int
 
+                of "persistent", "P":
+                    if result.dropconn:
+                        echo "Persistent database connections are incompatible with dropping TCP sockets."
+                        quit( 1 )
+                    result.persistent = true
+
                 else: discard
 
         of cmdEnd: assert( false ) # shouldn't reach here ever