netdata_tsrelay.nim
changeset 8 1ef3f2d6d10e
parent 7 c0bcf3bea772
child 9 aa9d537f7067
equal deleted inserted replaced
7:c0bcf3bea772 8:1ef3f2d6d10e
     1 # vim: set et nosta sw=4 ts=4 ft=nim : 
     1 # vim: set et nosta sw=4 ts=4 :
       
     2 # im: set et nosta sw=4 ts=4 ft=nim : 
     2 #
     3 #
     3 # Copyright (c) 2018, Mahlon E. Smith <mahlon@martini.nu>
     4 # Copyright (c) 2018, Mahlon E. Smith <mahlon@martini.nu>
     4 # All rights reserved.
     5 # All rights reserved.
     5 # Redistribution and use in source and binary forms, with or without
     6 # Redistribution and use in source and binary forms, with or without
     6 # modification, are permitted provided that the following conditions are met:
     7 # modification, are permitted provided that the following conditions are met:
    34     math,
    35     math,
    35     nativesockets,
    36     nativesockets,
    36     net,
    37     net,
    37     os,
    38     os,
    38     parseopt2,
    39     parseopt2,
       
    40     posix,
    39     strutils,
    41     strutils,
    40     tables,
    42     tables,
    41     terminal,
    43     terminal,
    42     times,
    44     times
    43     threadpool
       
    44 
    45 
    45 
    46 
    46 const
    47 const
    47     VERSION = "v0.1.0"
    48     VERSION = "v0.1.0"
    48     USAGE = """
    49     USAGE = """
    49 ./netdata_tsrelay [-q][-v][-h] --dbopts="[PostgreSQL connection string]" --listen-port=14866 --listen-addr=0.0.0.0
    50 ./netdata_tsrelay [-q][-v][-h] --dbopts="[PostgreSQL connection string]" --listen-port=14866 --listen-addr=0.0.0.0
    50 
    51 
    51   -q: Quiet mode.  No output at all.  Ignored if -d is supplied.
    52   -q: Quiet mode.  No output at all.  Ignored if -d is supplied.
    52   -c: Suppress ANSI color output.
       
    53   -d: Debug: Show incoming and parsed data.
    53   -d: Debug: Show incoming and parsed data.
    54   -v: Display version number.
    54   -v: Display version number.
    55   -h: Help.  You're lookin' at it.
    55   -h: Help.  You're lookin' at it.
    56 
    56 
    57 The default connection string is:
    57 The default connection string is:
    70         dbopts:      string  # The postgresql connection parameters.  (See https://www.postgresql.org/docs/current/static/libpq-connect.html)
    70         dbopts:      string  # The postgresql connection parameters.  (See https://www.postgresql.org/docs/current/static/libpq-connect.html)
    71         listen_port: int     # The port to listen for incoming connections
    71         listen_port: int     # The port to listen for incoming connections
    72         listen_addr: string  # The IP address listen for incoming connections.  Defaults to inaddr_any.
    72         listen_addr: string  # The IP address listen for incoming connections.  Defaults to inaddr_any.
    73         verbose:     bool    # Be informative
    73         verbose:     bool    # Be informative
    74         debug:       bool    # Spew out raw data
    74         debug:       bool    # Spew out raw data
    75         use_color:   bool    # Pretty things up a little, probably want to disable this if debugging
    75 
    76 
    76 # Global configuration
    77 
    77 var conf: Config
    78 # The global config object
       
    79 #
       
    80 # FIXME:  Rather than pass this all over the
       
    81 # place, consider channels and createThread instead of spawn.
       
    82 #
       
    83 var conf = Config(
       
    84     dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay",
       
    85     listen_port: 14866,
       
    86     listen_addr: "0.0.0.0",
       
    87     verbose: true,
       
    88     debug: false,
       
    89     use_color: true
       
    90 )
       
    91 
       
    92 
    78 
    93 proc hl( msg: string, fg: ForegroundColor, bright=false ): string =
    79 proc hl( msg: string, fg: ForegroundColor, bright=false ): string =
    94     ## Quick wrapper for color formatting a string, since the 'terminal'
    80     ## Quick wrapper for color formatting a string, since the 'terminal'
    95     ## module only deals with stdout directly.
    81     ## module only deals with stdout directly.
    96     if not conf.use_color: return msg
    82     if not isatty(stdout): return msg
    97 
    83 
    98     var color: BiggestInt = ord( fg )
    84     var color: BiggestInt = ord( fg )
    99     if bright: inc( color, 60 )
    85     if bright: inc( color, 60 )
   100     result = "\e[" & $color & 'm' & msg & "\e[0m"
    86     result = "\e[" & $color & 'm' & msg & "\e[0m"
   101 
    87 
   102 
    88 
   103 proc fetch_data( client: Socket ): string =
    89 proc fetch_data( client: Socket ): string =
   104     ## Netdata JSON backend doesn't send a length, so we read line by
    90     ## Netdata JSON backend doesn't send a length, so we read line by
   105     ## line and wait for stream timeout to determine a "sample".
    91     ## line and wait for stream timeout to determine a "sample".
       
    92     var buf: string = nil
   106     try:
    93     try:
   107         result = client.recv_line( timeout=500 ) & "\n"
    94         result = client.recv_line( timeout=500 )
   108         while result != "":
    95         if result != "" and not result.is_nil: result = result & "\n"
   109             result = result & client.recv_line( timeout=500 ) & "\n"
    96         while buf != "":
       
    97             buf = client.recv_line( timeout=500 )
       
    98             if buf != "" and not buf.is_nil: result = result & buf & "\n"
   110     except TimeoutError:
    99     except TimeoutError:
   111         discard
   100         discard
   112 
   101 
   113 
   102 
   114 proc parse_data( data: string, conf: Config ): Table[ BiggestInt, JsonNode ] =
   103 proc parse_data( data: string ): seq[ JsonNode ] =
   115     ## Given a raw +data+ string, parse JSON and return a table of
   104     ## Given a raw +data+ string, parse JSON and return a sequence
   116     ## JSON samples ready for writing, keyed by timestamp. Netdata can
   105     ## of JSON samples. Netdata can buffer multiple samples in one batch.
   117     ## buffer multiple samples in one batch.
   106     result = @[]
   118     if data == "": return
   107     if data == "" or data.is_nil: return
   119 
   108 
   120     # Hash of sample timeperiods to pivoted json data
   109     # Hash of sample timeperiods to pivoted json data
   121     result = init_table[ BiggestInt, JsonNode ]()
   110     var pivoted_data = init_table[ BiggestInt, JsonNode ]()
   122 
   111 
   123     for sample in split_lines( data ):
   112     for sample in split_lines( data ):
   124         if conf.debug: echo sample.hl( fgBlack, bright=true )
   113         if sample == "" or sample.is_nil: continue
   125         if sample.len == 0: continue
   114         #if conf.debug: echo sample.hl( fgBlack, bright=true )
   126 
   115 
   127         var parsed: JsonNode
   116         var parsed: JsonNode
   128         try:
   117         try:
   129             parsed = sample.parse_json
   118             parsed = sample.parse_json
   130         except JsonParsingError:
   119         except JsonParsingError:
   131             discard
       
   132             if conf.debug: echo hl( "Unable to parse sample line: " & sample.hl(fgRed, bright=true), fgRed )
   120             if conf.debug: echo hl( "Unable to parse sample line: " & sample.hl(fgRed, bright=true), fgRed )
       
   121             continue
       
   122         if parsed.kind != JObject: return
   133 
   123 
   134         # Create or use existing Json object for modded data.
   124         # Create or use existing Json object for modded data.
   135         #
   125         #
   136         var pivot: JsonNode
   126         var pivot: JsonNode
   137         let key = parsed["timestamp"].get_num
   127         try:
   138 
   128             let key = parsed["timestamp"].get_num
   139         if result.has_key( key ):
   129 
   140             pivot = result[ key ]
   130             if pivoted_data.has_key( key ):
   141         else:
   131                 pivot = pivoted_data[ key ]
   142             pivot = newJObject()
   132             else:
   143             result[ key ] = pivot
   133                 pivot = newJObject()
   144 
   134                 pivoted_data[ key ] = pivot
   145         var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str
   135 
   146         pivot[ "hostname" ] = parsed[ "hostname" ]
   136             var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str
   147         pivot[ name ] = parsed[ "value" ]
   137             pivot[ "hostname" ] = parsed[ "hostname" ]
   148 
   138             pivot[ "timestamp" ] = parsed[ "timestamp" ]
   149     return result
   139             pivot[ name ] = parsed[ "value" ]
   150 
   140         except:
   151 
   141             continue
   152 proc process( client: Socket, db: DBConn, conf: Config ): int =
   142 
       
   143     for timestamp, sample in pivoted_data:
       
   144         result.add( sample )
       
   145 
       
   146 
       
   147 proc write_to_database( samples: seq[ JsonNode ] ): void =
       
   148     ## Given a sequence of json samples, write them to database.
       
   149     if samples.len == 0: return
       
   150 
       
   151     let db = open( "", "", "", conf.dbopts )
       
   152 
       
   153     try:
       
   154         db.exec sql( "BEGIN" )
       
   155         for sample in samples:
       
   156             var
       
   157                 timestamp = sample[ "timestamp" ].get_num
       
   158                 host = sample[ "hostname" ].get_str
       
   159             sample.delete( "timestamp" )
       
   160             sample.delete( "hostname" )
       
   161             db.exec sql( INSERT_SQL ), timestamp, host, sample
       
   162         db.exec sql( "COMMIT" )
       
   163     except:
       
   164         let
       
   165             e = getCurrentException()
       
   166             msg = getCurrentExceptionMsg()
       
   167         echo "Got exception ", repr(e), " while writing to DB: ", msg
       
   168         discard
       
   169 
       
   170     db.close
       
   171 
       
   172 
       
   173 proc process( client: Socket, address: string ): void =
   153     ## Do the work for a connected client within a thread.
   174     ## Do the work for a connected client within a thread.
   154     ## Returns the number of samples parsed.
   175     ## Returns the formatted json data keyed on sample time.
       
   176     let t0 = cpu_time()
   155     var raw_data = client.fetch_data
   177     var raw_data = client.fetch_data
   156 
   178 
   157     # Done with the socket, netdata will automatically
   179     # Done with the socket, netdata will automatically
   158     # reconnect.  Save local resources/file descriptors
   180     # reconnect.  Save local resources/file descriptors
   159     # by closing after the send is considered complete.
   181     # by closing after the send is considered complete.
   161     try:
   183     try:
   162         client.close
   184         client.close
   163     except OSError:
   185     except OSError:
   164         return
   186         return
   165 
   187 
   166     # Pivot data and save to SQL.
   188     # Pivot the parsed data to a single JSON blob per sample time.
   167     #
   189     var samples = parse_data( raw_data )
   168     var samples = parse_data( raw_data, conf )
   190     write_to_database( samples )
   169     if samples.len != 0:
       
   170         db.exec sql( "BEGIN" )
       
   171         for timestamp, sample in samples:
       
   172             var host = sample[ "hostname" ].get_str
       
   173             sample.delete( "hostname" )
       
   174             db.exec sql( INSERT_SQL ), timestamp, host, sample
       
   175         db.exec sql( "COMMIT" )
       
   176 
       
   177     return samples.len
       
   178 
       
   179 
       
   180 proc runthread( client: Socket, address: string, db: DBConn, conf: Config ): void {.thread.} =
       
   181     ## A thread that performs that dispatches processing and returns
       
   182     ## results.
       
   183     let t0 = cpu_time()
       
   184     var samples = client.process( db, conf )
       
   185 
   191 
   186     if conf.verbose:
   192     if conf.verbose:
   187         echo(
   193         echo(
   188             hl( $samples, fgWhite, bright=true ),
   194             hl( $(epochTime().to_int), fgMagenta, bright=true ),
       
   195             " ",
       
   196             hl( $(samples.len), fgWhite, bright=true ),
   189             " sample(s) parsed from ",
   197             " sample(s) parsed from ",
   190             address.hl( fgYellow, bright=true ),
   198             address.hl( fgYellow, bright=true ),
   191             " in ", hl($( round(cpu_time() - t0, 3) ), fgWhite, bright=true), " seconds."
   199             " in ", hl($( round(cpu_time() - t0, 3) ), fgWhite, bright=true), " seconds."
   192             # " ", hl($(round((get_occupied_mem()/1024/1024),1)), fgWhite, bright=true), "MB memory used."
   200             # " ", hl($(round((get_occupied_mem()/1024/1024),1)), fgWhite, bright=true), "MB memory used."
   193         )
   201         )
   194 
   202 
   195 
   203 
   196 proc serverloop: void =
   204 proc serverloop( conf: Config ): void =
   197     ## Open a database connection, bind to the listening socket,
   205     ## Open a database connection, bind to the listening socket,
   198     ## and start serving incoming netdata streams.
   206     ## and start serving incoming netdata streams.
   199     let db = open( "", "", "", conf.dbopts )
   207     let db = open( "", "", "", conf.dbopts )
   200     if conf.verbose: echo( "Successfully connected to the backend database.".hl( fgGreen ) )
   208     if conf.verbose: echo( "Successfully tested connection to the backend database.".hl( fgGreen ) )
   201 
   209     db.close
   202     var
   210 
   203         conn_count = 0
   211     # Setup listening socket.
   204         server = newSocket()
   212     #
   205 
   213     var server = newSocket()
   206     server.set_sock_opt( OptReuseAddr, true )
   214     server.set_sock_opt( OptReuseAddr, true )
   207     server.bind_addr( Port(conf.listen_port), conf.listen_addr )
   215     server.bind_addr( Port(conf.listen_port), conf.listen_addr )
   208     server.listen()
   216     server.listen()
   209 
   217 
   210     if conf.verbose:
   218     if conf.verbose:
   214             ":",
   222             ":",
   215             hl( $conf.listen_port, fgBlue, bright=true ),
   223             hl( $conf.listen_port, fgBlue, bright=true ),
   216         )
   224         )
   217         echo ""
   225         echo ""
   218 
   226 
       
   227     # Wait for incoming connections, fork for each client.
       
   228     #
   219     while true:
   229     while true:
   220         var client  = newSocket()
   230         var
   221         var address = ""
   231             client  = new Socket
   222 
   232             address = ""
   223         # Force a garbage collection pass.
   233             status: cint = 0
   224         #
   234 
   225         conn_count = conn_count + 1
   235         server.acceptAddr( client, address ) # block
   226         if conn_count == 25:
   236 
   227             when defined( testing ): echo "Forcing GC pass."
   237         if fork() == 0:
   228             GC_full_collect()
   238             server.close
   229             conn_count = 1
   239             client.process( address )
       
   240             quit( 0 )
   230 
   241 
   231         client.close
   242         client.close
   232         server.acceptAddr( client, address ) # blocking call
   243 
   233         spawn runthread( client, address, db, conf )
   244         discard waitpid( P_ALL, status, WNOHANG ) # reap all previous children
   234         when defined( testing ): dumpNumberOfInstances()
   245         when defined( testing ): dumpNumberOfInstances()
   235 
   246 
   236 
   247 
   237 proc parse_cmdline: void =
   248 proc parse_cmdline: Config =
   238     ## Populate the config object with the user's preferences.
   249     ## Populate the config object with the user's preferences.
   239 
   250 
       
   251     # Config object defaults.
       
   252     #
       
   253     result = Config(
       
   254         dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay",
       
   255         listen_port: 14866,
       
   256         listen_addr: "0.0.0.0",
       
   257         verbose: true,
       
   258         debug: false
       
   259     )
       
   260 
   240     # always set debug mode if development build.
   261     # always set debug mode if development build.
   241     conf.debug = defined( testing )
   262     result.debug = defined( testing )
   242 
   263 
   243     for kind, key, val in getopt():
   264     for kind, key, val in getopt():
   244         case kind
   265         case kind
   245 
   266 
   246         of cmdArgument:
   267         of cmdArgument:
   247             discard
   268             discard
   248 
   269 
   249         of cmdLongOption, cmdShortOption:
   270         of cmdLongOption, cmdShortOption:
   250             case key
   271             case key
   251                 of "debug", "d":
   272                 of "debug", "d":
   252                     conf.debug = true
   273                     result.debug = true
   253 
       
   254                 of "no-color", "c":
       
   255                     conf.use_color = false
       
   256 
   274 
   257                 of "help", "h":
   275                 of "help", "h":
   258                     echo USAGE
   276                     echo USAGE
   259                     quit( 0 )
   277                     quit( 0 )
   260 
   278 
   261                 of "quiet", "q":
   279                 of "quiet", "q":
   262                     conf.verbose = false
   280                     result.verbose = false
   263             
   281             
   264                 of "version", "v":
   282                 of "version", "v":
   265                     echo hl( "netdata_tsrelay " & VERSION, fgWhite, bright=true )
   283                     echo hl( "netdata_tsrelay " & VERSION, fgWhite, bright=true )
   266                     quit( 0 )
   284                     quit( 0 )
   267                
   285                
   268                 of "dbopts": conf.dbopts = val
   286                 of "dbopts": result.dbopts = val
   269                 of "listen-addr", "a": conf.listen_addr = val
   287                 of "listen-addr", "a": result.listen_addr = val
   270                 of "listen-port", "p": conf.listen_port = val.parse_int
   288                 of "listen-port", "p": result.listen_port = val.parse_int
   271 
   289 
   272                 else: discard
   290                 else: discard
   273 
   291 
   274         of cmdEnd: assert( false ) # shouldn't reach here ever
   292         of cmdEnd: assert( false ) # shouldn't reach here ever
   275 
   293 
   276 
   294 
   277 when isMainModule:
   295 when isMainModule:
   278     system.addQuitProc( resetAttributes )
   296     system.addQuitProc( resetAttributes )
   279 
   297 
   280     parse_cmdline()
   298     conf = parse_cmdline()
   281     if conf.debug: echo hl( $conf, fgYellow )
   299     if conf.debug: echo hl( $conf, fgYellow )
   282 
   300 
   283     serverloop()
   301     serverloop( conf )
   284 
   302