Multiple changes.
authorMahlon E. Smith <mahlon@laika.com>
Sun, 18 Feb 2018 18:16:37 -0800
changeset 8 1ef3f2d6d10e
parent 7 c0bcf3bea772
child 9 aa9d537f7067
Multiple changes. - There's still a delay somewhere with threading in the socket read() that impacts simultaneous client connections. After a bunch of experimenting with Channel message passing, rip it all out in favor of a simple fork()ing server. - Remove the color option, just check stdout for a tty instead to make it automatic. - Better error handling for malformed packets/samples.
netdata_tsrelay.nim
--- a/netdata_tsrelay.nim	Thu Feb 15 10:29:37 2018 -0800
+++ b/netdata_tsrelay.nim	Sun Feb 18 18:16:37 2018 -0800
@@ -1,4 +1,5 @@
-# vim: set et nosta sw=4 ts=4 ft=nim : 
+# vim: set et nosta sw=4 ts=4 :
+# im: set et nosta sw=4 ts=4 ft=nim : 
 #
 # Copyright (c) 2018, Mahlon E. Smith <mahlon@martini.nu>
 # All rights reserved.
@@ -36,11 +37,11 @@
     net,
     os,
     parseopt2,
+    posix,
     strutils,
     tables,
     terminal,
-    times,
-    threadpool
+    times
 
 
 const
@@ -49,7 +50,6 @@
 ./netdata_tsrelay [-q][-v][-h] --dbopts="[PostgreSQL connection string]" --listen-port=14866 --listen-addr=0.0.0.0
 
   -q: Quiet mode.  No output at all.  Ignored if -d is supplied.
-  -c: Suppress ANSI color output.
   -d: Debug: Show incoming and parsed data.
   -v: Display version number.
   -h: Help.  You're lookin' at it.
@@ -72,28 +72,14 @@
         listen_addr: string  # The IP address listen for incoming connections.  Defaults to inaddr_any.
         verbose:     bool    # Be informative
         debug:       bool    # Spew out raw data
-        use_color:   bool    # Pretty things up a little, probably want to disable this if debugging
 
-
-# The global config object
-#
-# FIXME:  Rather than pass this all over the
-# place, consider channels and createThread instead of spawn.
-#
-var conf = Config(
-    dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay",
-    listen_port: 14866,
-    listen_addr: "0.0.0.0",
-    verbose: true,
-    debug: false,
-    use_color: true
-)
-
+# Global configuration
+var conf: Config
 
 proc hl( msg: string, fg: ForegroundColor, bright=false ): string =
     ## Quick wrapper for color formatting a string, since the 'terminal'
     ## module only deals with stdout directly.
-    if not conf.use_color: return msg
+    if not isatty(stdout): return msg
 
     var color: BiggestInt = ord( fg )
     if bright: inc( color, 60 )
@@ -103,55 +89,91 @@
 proc fetch_data( client: Socket ): string =
     ## Netdata JSON backend doesn't send a length, so we read line by
     ## line and wait for stream timeout to determine a "sample".
+    var buf: string = nil
     try:
-        result = client.recv_line( timeout=500 ) & "\n"
-        while result != "":
-            result = result & client.recv_line( timeout=500 ) & "\n"
+        result = client.recv_line( timeout=500 )
+        if result != "" and not result.is_nil: result = result & "\n"
+        while buf != "":
+            buf = client.recv_line( timeout=500 )
+            if buf != "" and not buf.is_nil: result = result & buf & "\n"
     except TimeoutError:
         discard
 
 
-proc parse_data( data: string, conf: Config ): Table[ BiggestInt, JsonNode ] =
-    ## Given a raw +data+ string, parse JSON and return a table of
-    ## JSON samples ready for writing, keyed by timestamp. Netdata can
-    ## buffer multiple samples in one batch.
-    if data == "": return
+proc parse_data( data: string ): seq[ JsonNode ] =
+    ## Given a raw +data+ string, parse JSON and return a sequence
+    ## of JSON samples. Netdata can buffer multiple samples in one batch.
+    result = @[]
+    if data == "" or data.is_nil: return
 
     # Hash of sample timeperiods to pivoted json data
-    result = init_table[ BiggestInt, JsonNode ]()
+    var pivoted_data = init_table[ BiggestInt, JsonNode ]()
 
     for sample in split_lines( data ):
-        if conf.debug: echo sample.hl( fgBlack, bright=true )
-        if sample.len == 0: continue
+        if sample == "" or sample.is_nil: continue
+        #if conf.debug: echo sample.hl( fgBlack, bright=true )
 
         var parsed: JsonNode
         try:
             parsed = sample.parse_json
         except JsonParsingError:
-            discard
             if conf.debug: echo hl( "Unable to parse sample line: " & sample.hl(fgRed, bright=true), fgRed )
+            continue
+        if parsed.kind != JObject: return
 
         # Create or use existing Json object for modded data.
         #
         var pivot: JsonNode
-        let key = parsed["timestamp"].get_num
+        try:
+            let key = parsed["timestamp"].get_num
+
+            if pivoted_data.has_key( key ):
+                pivot = pivoted_data[ key ]
+            else:
+                pivot = newJObject()
+                pivoted_data[ key ] = pivot
 
-        if result.has_key( key ):
-            pivot = result[ key ]
-        else:
-            pivot = newJObject()
-            result[ key ] = pivot
+            var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str
+            pivot[ "hostname" ] = parsed[ "hostname" ]
+            pivot[ "timestamp" ] = parsed[ "timestamp" ]
+            pivot[ name ] = parsed[ "value" ]
+        except:
+            continue
 
-        var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str
-        pivot[ "hostname" ] = parsed[ "hostname" ]
-        pivot[ name ] = parsed[ "value" ]
-
-    return result
+    for timestamp, sample in pivoted_data:
+        result.add( sample )
 
 
-proc process( client: Socket, db: DBConn, conf: Config ): int =
+proc write_to_database( samples: seq[ JsonNode ] ): void =
+    ## Given a sequence of json samples, write them to database.
+    if samples.len == 0: return
+
+    let db = open( "", "", "", conf.dbopts )
+
+    try:
+        db.exec sql( "BEGIN" )
+        for sample in samples:
+            var
+                timestamp = sample[ "timestamp" ].get_num
+                host = sample[ "hostname" ].get_str
+            sample.delete( "timestamp" )
+            sample.delete( "hostname" )
+            db.exec sql( INSERT_SQL ), timestamp, host, sample
+        db.exec sql( "COMMIT" )
+    except:
+        let
+            e = getCurrentException()
+            msg = getCurrentExceptionMsg()
+        echo "Got exception ", repr(e), " while writing to DB: ", msg
+        discard
+
+    db.close
+
+
+proc process( client: Socket, address: string ): void =
     ## Do the work for a connected client within a thread.
-    ## Returns the number of samples parsed.
+    ## Returns the formatted json data keyed on sample time.
+    let t0 = cpu_time()
     var raw_data = client.fetch_data
 
     # Done with the socket, netdata will automatically
@@ -163,29 +185,15 @@
     except OSError:
         return
 
-    # Pivot data and save to SQL.
-    #
-    var samples = parse_data( raw_data, conf )
-    if samples.len != 0:
-        db.exec sql( "BEGIN" )
-        for timestamp, sample in samples:
-            var host = sample[ "hostname" ].get_str
-            sample.delete( "hostname" )
-            db.exec sql( INSERT_SQL ), timestamp, host, sample
-        db.exec sql( "COMMIT" )
-
-    return samples.len
-
-
-proc runthread( client: Socket, address: string, db: DBConn, conf: Config ): void {.thread.} =
-    ## A thread that performs that dispatches processing and returns
-    ## results.
-    let t0 = cpu_time()
-    var samples = client.process( db, conf )
+    # Pivot the parsed data to a single JSON blob per sample time.
+    var samples = parse_data( raw_data )
+    write_to_database( samples )
 
     if conf.verbose:
         echo(
-            hl( $samples, fgWhite, bright=true ),
+            hl( $(epochTime().to_int), fgMagenta, bright=true ),
+            " ",
+            hl( $(samples.len), fgWhite, bright=true ),
             " sample(s) parsed from ",
             address.hl( fgYellow, bright=true ),
             " in ", hl($( round(cpu_time() - t0, 3) ), fgWhite, bright=true), " seconds."
@@ -193,16 +201,16 @@
         )
 
 
-proc serverloop: void =
+proc serverloop( conf: Config ): void =
     ## Open a database connection, bind to the listening socket,
     ## and start serving incoming netdata streams.
     let db = open( "", "", "", conf.dbopts )
-    if conf.verbose: echo( "Successfully connected to the backend database.".hl( fgGreen ) )
+    if conf.verbose: echo( "Successfully tested connection to the backend database.".hl( fgGreen ) )
+    db.close
 
-    var
-        conn_count = 0
-        server = newSocket()
-
+    # Setup listening socket.
+    #
+    var server = newSocket()
     server.set_sock_opt( OptReuseAddr, true )
     server.bind_addr( Port(conf.listen_port), conf.listen_addr )
     server.listen()
@@ -216,29 +224,42 @@
         )
         echo ""
 
+    # Wait for incoming connections, fork for each client.
+    #
     while true:
-        var client  = newSocket()
-        var address = ""
+        var
+            client  = new Socket
+            address = ""
+            status: cint = 0
 
-        # Force a garbage collection pass.
-        #
-        conn_count = conn_count + 1
-        if conn_count == 25:
-            when defined( testing ): echo "Forcing GC pass."
-            GC_full_collect()
-            conn_count = 1
+        server.acceptAddr( client, address ) # block
+
+        if fork() == 0:
+            server.close
+            client.process( address )
+            quit( 0 )
 
         client.close
-        server.acceptAddr( client, address ) # blocking call
-        spawn runthread( client, address, db, conf )
+
+        discard waitpid( P_ALL, status, WNOHANG ) # reap all previous children
         when defined( testing ): dumpNumberOfInstances()
 
 
-proc parse_cmdline: void =
+proc parse_cmdline: Config =
     ## Populate the config object with the user's preferences.
 
+    # Config object defaults.
+    #
+    result = Config(
+        dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay",
+        listen_port: 14866,
+        listen_addr: "0.0.0.0",
+        verbose: true,
+        debug: false
+    )
+
     # always set debug mode if development build.
-    conf.debug = defined( testing )
+    result.debug = defined( testing )
 
     for kind, key, val in getopt():
         case kind
@@ -249,25 +270,22 @@
         of cmdLongOption, cmdShortOption:
             case key
                 of "debug", "d":
-                    conf.debug = true
-
-                of "no-color", "c":
-                    conf.use_color = false
+                    result.debug = true
 
                 of "help", "h":
                     echo USAGE
                     quit( 0 )
 
                 of "quiet", "q":
-                    conf.verbose = false
+                    result.verbose = false
             
                 of "version", "v":
                     echo hl( "netdata_tsrelay " & VERSION, fgWhite, bright=true )
                     quit( 0 )
                
-                of "dbopts": conf.dbopts = val
-                of "listen-addr", "a": conf.listen_addr = val
-                of "listen-port", "p": conf.listen_port = val.parse_int
+                of "dbopts": result.dbopts = val
+                of "listen-addr", "a": result.listen_addr = val
+                of "listen-port", "p": result.listen_port = val.parse_int
 
                 else: discard
 
@@ -277,8 +295,8 @@
 when isMainModule:
     system.addQuitProc( resetAttributes )
 
-    parse_cmdline()
+    conf = parse_cmdline()
     if conf.debug: echo hl( $conf, fgYellow )
 
-    serverloop()
+    serverloop( conf )