netdata_tsrelay.nim
changeset 19 1f09cfb560e0
parent 17 96b8799a565a
child 21 a2fe9ec4cdf2
--- a/netdata_tsrelay.nim	Mon Nov 19 12:05:51 2018 -0800
+++ b/netdata_tsrelay.nim	Sat Jul 25 15:05:26 2020 -0700
@@ -1,6 +1,6 @@
 # vim: set et nosta sw=4 ts=4 :
 #
-# Copyright (c) 2018, Mahlon E. Smith <mahlon@martini.nu>
+# Copyright (c) 2018-2020, Mahlon E. Smith <mahlon@martini.nu>
 # All rights reserved.
 # Redistribution and use in source and binary forms, with or without
 # modification, are permitted provided that the following conditions are met:
@@ -34,29 +34,60 @@
     math,
     nativesockets,
     net,
-    os,
     parseopt,
     posix,
     strutils,
+    strformat,
     tables,
     terminal,
     times
 
 
 const
-    VERSION = "v0.2.0"
+    VERSION = "v0.3.0"
     USAGE = """
-./netdata_tsrelay [-d][-h][-q][-t][-T][-v] --dbopts="[PostgreSQL connection string]" --listen-port=14866 --listen-addr=0.0.0.0
+./netdata_tsrelay [-adDhopqtTv]
+
+  -a --listen-addr:
+    The outbound IP address to listen for netdata streams.
+
+  -d --debug:
+    Debug: Show incoming and parsed data.
+
+  -D --dropconn:
+    Drop the persistent socket to netdata between samples to conserve
+    local resources.  This may be helpful with a large number of clients.
+    Defaults to false.
+
+  -h --help:
+    Help.  You're lookin' at it.
 
-  -q: Quiet mode.  No output at all.  Ignored if -d is supplied.
-  -d: Debug: Show incoming and parsed data.
-  -v: Display version number.
-  -T: Change the destination table name from the default 'netdata'.
-  -t: Alter the maximum time (in ms) an open socket waits for data.  Default: 500ms.
-  -h: Help.  You're lookin' at it.
+  -o --dbopts:
+    The PostgreSQL connection string parameters.
+    The default connection string is:
+      "host=localhost dbname=netdata application_name=netdata-tsrelay"
+
+  -p --listen-port:
+    Change the listening port from the default (14866).
+
+  -P --persistent:
+    Don't disconnect from the database between samples.  This may be
+    more efficient with a small number of clients, when not using a
+    pooler, or with a very high sample size/rate.  Defaults to false.
 
-The default connection string is:
-  "host=localhost dbname=netdata application_name=netdata-tsrelay"
+  -q --quiet:
+    Quiet mode.  No output at all.  Ignored if -d is supplied.
+
+  -T --dbtable:
+    Change the destination table name from the default (netdata).
+
+  -t --timeout:
+    Alter the maximum time (in ms) an open socket waits for data
+    before processing the sample.  Default: 500ms.
+
+  -v --verbose:
+    Display version number.
+
     """
     INSERT_SQL = """
     INSERT INTO $1
@@ -70,6 +101,8 @@
     Config = object of RootObj
         dbopts:      string  # The postgresql connection parameters.  (See https://www.postgresql.org/docs/current/static/libpq-connect.html)
         dbtable:     string  # The name of the table to write to.
+        dropconn:    bool    # Close the TCP connection between samples.
+        persistent:  bool    # Don't close the database handle between samples.
         listen_port: int     # The port to listen for incoming connections.
         listen_addr: string  # The IP address listen for incoming connections.  Defaults to inaddr_any.
         verbose:     bool    # Be informative
@@ -77,6 +110,14 @@
         insertsql:   string  # The SQL insert string after interpolating the table name.
         timeout:     int     # How long to block, waiting on connection data.
 
+
+type
+    NetdataClient = ref object
+        sock: Socket    # The raw socket fd
+        address: string # The remote IP address
+        db: DbConn      # An optionally persistent database handle
+
+
 # Global configuration
 var conf: Config
 
@@ -91,16 +132,25 @@
     result = "\e[" & $color & 'm' & msg & "\e[0m"
 
 
-proc fetch_data( client: Socket ): string =
-    ## Netdata JSON backend doesn't send a length, so we read line by
-    ## line and wait for stream timeout to determine a "sample".
+proc fetch_data( client: NetdataClient ): string =
+    ## Netdata JSON backend doesn't send a length nor a separator
+    ## between samples, so we read line by line and wait for stream
+    ## timeout to determine what constitutes a sample.
     var buf = ""
-    try:
-        while true:
-            client.readline( buf, timeout=conf.timeout )
-            if buf != "": result = result & buf & "\n"
-    except TimeoutError:
-        return
+    while true:
+        try:
+            client.sock.readline( buf, timeout=conf.timeout )
+            if buf == "":
+                if conf.debug: echo "Client {client.address} closed socket.".fmt.hl( fgRed, bright=true )
+                quit( 1 )
+
+            result = result & buf & "\n"
+
+        except OSError:
+            quit( 1 )
+        except TimeoutError:
+            if result == "": continue 
+            return
 
 
 proc parse_data( data: string ): seq[ JsonNode ] =
@@ -147,22 +197,23 @@
         result.add( sample )
 
 
-proc write_to_database( samples: seq[ JsonNode ] ): void =
+proc write_to_database( client: NetdataClient, samples: seq[ JsonNode ] ): void =
     ## Given a sequence of json samples, write them to database.
     if samples.len == 0: return
 
-    let db = open( "", "", "", conf.dbopts )
+    if client.db.isNil:
+        client.db = open( "", "", "", conf.dbopts )
 
     try:
-        db.exec sql( "BEGIN" )
+        client.db.exec sql( "BEGIN" )
         for sample in samples:
             var
                 timestamp = sample[ "timestamp" ].get_int
                 host = sample[ "hostname" ].get_str.to_lowerascii
             sample.delete( "timestamp" )
             sample.delete( "hostname" )
-            db.exec sql( conf.insertsql ), timestamp, host, sample
-        db.exec sql( "COMMIT" )
+            client.db.exec sql( conf.insertsql ), timestamp, host, sample
+        client.db.exec sql( "COMMIT" )
     except:
         let
             e = getCurrentException()
@@ -170,10 +221,12 @@
         echo "Got exception ", repr(e), " while writing to DB: ", msg
         discard
 
-    db.close
+    if not conf.persistent:
+        client.db.close
+        client.db = nil
 
 
-proc process( client: Socket, address: string ): void =
+proc process( client: NetdataClient ): void =
     ## Do the work for a connected client within child process.
     let t0 = cpu_time()
     var raw_data = client.fetch_data
@@ -182,24 +235,25 @@
     # reconnect.  Save local resources/file descriptors
     # by closing after the send is considered complete.
     #
-    try:
-        client.close
-    except OSError:
-        return
+    if conf.dropconn:
+        try:
+            client.sock.close
+        except OSError:
+            return
 
     # Pivot the parsed data to a single JSON blob per sample time.
     var samples = parse_data( raw_data )
-    write_to_database( samples )
+    client.write_to_database( samples )
 
     if conf.verbose:
+        let cputime = cpu_time() - t0
         echo(
             hl( $(epochTime().to_int), fgMagenta, bright=true ),
             " ",
             hl( $(samples.len), fgWhite, bright=true ),
             " sample(s) parsed from ",
-            address.hl( fgYellow, bright=true ),
-            " in ", hl($( round(cpu_time() - t0, 3) ), fgWhite, bright=true), " seconds."
-            # " ", hl($(round((get_occupied_mem()/1024/1024),1)), fgWhite, bright=true), "MB memory used."
+            client.address.hl( fgYellow, bright=true ),
+            " in ", hl( "{cputime:<2.3f}".fmt, fgWhite, bright=true), " seconds."
         )
 
 
@@ -235,19 +289,23 @@
     # Wait for incoming connections, fork for each client.
     #
     while true:
-        var
-            client  = new Socket
-            address = ""
+        let client = NetdataClient.new
+        client.sock = Socket.new
 
         # Block, waiting for new connections.
-        server.acceptAddr( client, address )
+        server.acceptAddr( client.sock, client.address )
 
         if fork() == 0:
             server.close
-            client.process( address )
-            quit( 0 )
+            if conf.dropconn:
+                # "one shot" mode.
+                client.process
+                quit( 0 )
+            else:
+                # Keep the connection to netdata open.
+                while true: client.process
 
-        client.close
+        client.sock.close
         when defined( testing ): dumpNumberOfInstances()
 
 
@@ -259,11 +317,13 @@
     result = Config(
         dbopts: "host=localhost dbname=netdata application_name=netdata-tsrelay",
         dbtable: "netdata",
+        dropconn: false,
         listen_port: 14866,
         listen_addr: "0.0.0.0",
         verbose: true,
         debug: false,
         timeout: 500,
+        persistent: false,
         insertsql: INSERT_SQL % [ "netdata" ]
     )
 
@@ -281,6 +341,12 @@
                 of "debug", "d":
                     result.debug = true
 
+                of "dropconn", "D":
+                    if result.persistent:
+                        echo "Dropping TCP sockets are incompatible with persistent database connections."
+                        quit( 1 )
+                    result.dropconn = true
+
                 of "help", "h":
                     echo USAGE
                     quit( 0 )
@@ -296,11 +362,17 @@
 
                 of "dbtable", "T":
                     result.insertsql = INSERT_SQL % [ val ]
-                of "dbopts": result.dbopts = val
+                of "dbopts", "o": result.dbopts = val
 
                 of "listen-addr", "a": result.listen_addr = val
                 of "listen-port", "p": result.listen_port = val.parse_int
 
+                of "persistent", "P":
+                    if result.dropconn:
+                        echo "Persistent database connections are incompatible with dropping TCP sockets."
+                        quit( 1 )
+                    result.persistent = true
+
                 else: discard
 
         of cmdEnd: assert( false ) # shouldn't reach here ever