netdata_tsrelay.nim
changeset 19 1f09cfb560e0
parent 17 96b8799a565a
child 21 a2fe9ec4cdf2
equal deleted inserted replaced
18:a135fdaed52b 19:1f09cfb560e0
     1 # vim: set et nosta sw=4 ts=4 :
     1 # vim: set et nosta sw=4 ts=4 :
     2 #
     2 #
     3 # Copyright (c) 2018, Mahlon E. Smith <mahlon@martini.nu>
     3 # Copyright (c) 2018-2020, Mahlon E. Smith <mahlon@martini.nu>
     4 # All rights reserved.
     4 # All rights reserved.
     5 # Redistribution and use in source and binary forms, with or without
     5 # Redistribution and use in source and binary forms, with or without
     6 # modification, are permitted provided that the following conditions are met:
     6 # modification, are permitted provided that the following conditions are met:
     7 #
     7 #
     8 #     * Redistributions of source code must retain the above copyright
     8 #     * Redistributions of source code must retain the above copyright
    32     db_postgres,
    32     db_postgres,
    33     json,
    33     json,
    34     math,
    34     math,
    35     nativesockets,
    35     nativesockets,
    36     net,
    36     net,
    37     os,
       
    38     parseopt,
    37     parseopt,
    39     posix,
    38     posix,
    40     strutils,
    39     strutils,
       
    40     strformat,
    41     tables,
    41     tables,
    42     terminal,
    42     terminal,
    43     times
    43     times
    44 
    44 
    45 
    45 
    46 const
    46 const
    47     VERSION = "v0.2.0"
    47     VERSION = "v0.3.0"
    48     USAGE = """
    48     USAGE = """
    49 ./netdata_tsrelay [-d][-h][-q][-t][-T][-v] --dbopts="[PostgreSQL connection string]" --listen-port=14866 --listen-addr=0.0.0.0
    49 ./netdata_tsrelay [-adDhopqtTv]
    50 
    50 
    51   -q: Quiet mode.  No output at all.  Ignored if -d is supplied.
    51   -a --listen-addr:
    52   -d: Debug: Show incoming and parsed data.
    52     The outbound IP address to listen for netdata streams.
    53   -v: Display version number.
    53 
    54   -T: Change the destination table name from the default 'netdata'.
    54   -d --debug:
    55   -t: Alter the maximum time (in ms) an open socket waits for data.  Default: 500ms.
    55     Debug: Show incoming and parsed data.
    56   -h: Help.  You're lookin' at it.
    56 
    57 
    57   -D --dropconn:
    58 The default connection string is:
    58     Drop the persistent socket to netdata between samples to conserve
    59   "host=localhost dbname=netdata application_name=netdata-tsrelay"
    59     local resources.  This may be helpful with a large number of clients.
       
    60     Defaults to false.
       
    61 
       
    62   -h --help:
       
    63     Help.  You're lookin' at it.
       
    64 
       
    65   -o --dbopts:
       
    66     The PostgreSQL connection string parameters.
       
    67     The default connection string is:
       
    68       "host=localhost dbname=netdata application_name=netdata-tsrelay"
       
    69 
       
    70   -p --listen-port:
       
    71     Change the listening port from the default (14866).
       
    72 
       
    73   -P --persistent:
       
    74     Don't disconnect from the database between samples.  This may be
       
    75     more efficient with a small number of clients, when not using a
       
    76     pooler, or with a very high sample size/rate.  Defaults to false.
       
    77 
       
    78   -q --quiet:
       
    79     Quiet mode.  No output at all.  Ignored if -d is supplied.
       
    80 
       
    81   -T --dbtable:
       
    82     Change the destination table name from the default (netdata).
       
    83 
       
    84   -t --timeout:
       
    85     Alter the maximum time (in ms) an open socket waits for data
       
    86     before processing the sample.  Default: 500ms.
       
    87 
       
    88   -v --verbose:
       
    89     Display version number.
       
    90 
    60     """
    91     """
    61     INSERT_SQL = """
    92     INSERT_SQL = """
    62     INSERT INTO $1
    93     INSERT INTO $1
    63         ( time, host, metrics )
    94         ( time, host, metrics )
    64     VALUES
    95     VALUES
    68 
    99 
    69 type
   100 type
    70     Config = object of RootObj
   101     Config = object of RootObj
    71         dbopts:      string  # The postgresql connection parameters.  (See https://www.postgresql.org/docs/current/static/libpq-connect.html)
   102         dbopts:      string  # The postgresql connection parameters.  (See https://www.postgresql.org/docs/current/static/libpq-connect.html)
    72         dbtable:     string  # The name of the table to write to.
   103         dbtable:     string  # The name of the table to write to.
       
   104         dropconn:    bool    # Close the TCP connection between samples.
       
   105         persistent:  bool    # Don't close the database handle between samples.
    73         listen_port: int     # The port to listen for incoming connections.
   106         listen_port: int     # The port to listen for incoming connections.
    74         listen_addr: string  # The IP address listen for incoming connections.  Defaults to inaddr_any.
   107         listen_addr: string  # The IP address listen for incoming connections.  Defaults to inaddr_any.
    75         verbose:     bool    # Be informative
   108         verbose:     bool    # Be informative
    76         debug:       bool    # Spew out raw data
   109         debug:       bool    # Spew out raw data
    77         insertsql:   string  # The SQL insert string after interpolating the table name.
   110         insertsql:   string  # The SQL insert string after interpolating the table name.
    78         timeout:     int     # How long to block, waiting on connection data.
   111         timeout:     int     # How long to block, waiting on connection data.
    79 
   112 
       
   113 
       
   114 type
       
   115     NetdataClient = ref object
       
   116         sock: Socket    # The raw socket fd
       
   117         address: string # The remote IP address
       
   118         db: DbConn      # An optionally persistent database handle
       
   119 
       
   120 
    80 # Global configuration
   121 # Global configuration
    81 var conf: Config
   122 var conf: Config
    82 
   123 
    83 
   124 
    84 proc hl( msg: string, fg: ForegroundColor, bright=false ): string =
   125 proc hl( msg: string, fg: ForegroundColor, bright=false ): string =
    89     var color: BiggestInt = ord( fg )
   130     var color: BiggestInt = ord( fg )
    90     if bright: inc( color, 60 )
   131     if bright: inc( color, 60 )
    91     result = "\e[" & $color & 'm' & msg & "\e[0m"
   132     result = "\e[" & $color & 'm' & msg & "\e[0m"
    92 
   133 
    93 
   134 
    94 proc fetch_data( client: Socket ): string =
   135 proc fetch_data( client: NetdataClient ): string =
    95     ## Netdata JSON backend doesn't send a length, so we read line by
   136     ## Netdata JSON backend doesn't send a length nor a separator
    96     ## line and wait for stream timeout to determine a "sample".
   137     ## between samples, so we read line by line and wait for stream
       
   138     ## timeout to determine what constitutes a sample.
    97     var buf = ""
   139     var buf = ""
    98     try:
   140     while true:
    99         while true:
   141         try:
   100             client.readline( buf, timeout=conf.timeout )
   142             client.sock.readline( buf, timeout=conf.timeout )
   101             if buf != "": result = result & buf & "\n"
   143             if buf == "":
   102     except TimeoutError:
   144                 if conf.debug: echo "Client {client.address} closed socket.".fmt.hl( fgRed, bright=true )
   103         return
   145                 quit( 1 )
       
   146 
       
   147             result = result & buf & "\n"
       
   148 
       
   149         except OSError:
       
   150             quit( 1 )
       
   151         except TimeoutError:
       
   152             if result == "": continue 
       
   153             return
   104 
   154 
   105 
   155 
   106 proc parse_data( data: string ): seq[ JsonNode ] =
   156 proc parse_data( data: string ): seq[ JsonNode ] =
   107     ## Given a raw +data+ string, parse JSON and return a sequence
   157     ## Given a raw +data+ string, parse JSON and return a sequence
   108     ## of JSON samples. Netdata can buffer multiple samples in one batch.
   158     ## of JSON samples. Netdata can buffer multiple samples in one batch.
   145 
   195 
   146     for timestamp, sample in pivoted_data:
   196     for timestamp, sample in pivoted_data:
   147         result.add( sample )
   197         result.add( sample )
   148 
   198 
   149 
   199 
   150 proc write_to_database( samples: seq[ JsonNode ] ): void =
   200 proc write_to_database( client: NetdataClient, samples: seq[ JsonNode ] ): void =
   151     ## Given a sequence of json samples, write them to database.
   201     ## Given a sequence of json samples, write them to database.
   152     if samples.len == 0: return
   202     if samples.len == 0: return
   153 
   203 
   154     let db = open( "", "", "", conf.dbopts )
   204     if client.db.isNil:
       
   205         client.db = open( "", "", "", conf.dbopts )
   155 
   206 
   156     try:
   207     try:
   157         db.exec sql( "BEGIN" )
   208         client.db.exec sql( "BEGIN" )
   158         for sample in samples:
   209         for sample in samples:
   159             var
   210             var
   160                 timestamp = sample[ "timestamp" ].get_int
   211                 timestamp = sample[ "timestamp" ].get_int
   161                 host = sample[ "hostname" ].get_str.to_lowerascii
   212                 host = sample[ "hostname" ].get_str.to_lowerascii
   162             sample.delete( "timestamp" )
   213             sample.delete( "timestamp" )
   163             sample.delete( "hostname" )
   214             sample.delete( "hostname" )
   164             db.exec sql( conf.insertsql ), timestamp, host, sample
   215             client.db.exec sql( conf.insertsql ), timestamp, host, sample
   165         db.exec sql( "COMMIT" )
   216         client.db.exec sql( "COMMIT" )
   166     except:
   217     except:
   167         let
   218         let
   168             e = getCurrentException()
   219             e = getCurrentException()
   169             msg = getCurrentExceptionMsg()
   220             msg = getCurrentExceptionMsg()
   170         echo "Got exception ", repr(e), " while writing to DB: ", msg
   221         echo "Got exception ", repr(e), " while writing to DB: ", msg
   171         discard
   222         discard
   172 
   223 
   173     db.close
   224     if not conf.persistent:
   174 
   225         client.db.close
   175 
   226         client.db = nil
   176 proc process( client: Socket, address: string ): void =
   227 
       
   228 
       
   229 proc process( client: NetdataClient ): void =
   177     ## Do the work for a connected client within child process.
   230     ## Do the work for a connected client within child process.
   178     let t0 = cpu_time()
   231     let t0 = cpu_time()
   179     var raw_data = client.fetch_data
   232     var raw_data = client.fetch_data
   180 
   233 
   181     # Done with the socket, netdata will automatically
   234     # Done with the socket, netdata will automatically
   182     # reconnect.  Save local resources/file descriptors
   235     # reconnect.  Save local resources/file descriptors
   183     # by closing after the send is considered complete.
   236     # by closing after the send is considered complete.
   184     #
   237     #
   185     try:
   238     if conf.dropconn:
   186         client.close
   239         try:
   187     except OSError:
   240             client.sock.close
   188         return
   241         except OSError:
       
   242             return
   189 
   243 
   190     # Pivot the parsed data to a single JSON blob per sample time.
   244     # Pivot the parsed data to a single JSON blob per sample time.
   191     var samples = parse_data( raw_data )
   245     var samples = parse_data( raw_data )
   192     write_to_database( samples )
   246     client.write_to_database( samples )
   193 
   247 
   194     if conf.verbose:
   248     if conf.verbose:
       
   249         let cputime = cpu_time() - t0
   195         echo(
   250         echo(
   196             hl( $(epochTime().to_int), fgMagenta, bright=true ),
   251             hl( $(epochTime().to_int), fgMagenta, bright=true ),
   197             " ",
   252             " ",
   198             hl( $(samples.len), fgWhite, bright=true ),
   253             hl( $(samples.len), fgWhite, bright=true ),
   199             " sample(s) parsed from ",
   254             " sample(s) parsed from ",
   200             address.hl( fgYellow, bright=true ),
   255             client.address.hl( fgYellow, bright=true ),
   201             " in ", hl($( round(cpu_time() - t0, 3) ), fgWhite, bright=true), " seconds."
   256             " in ", hl( "{cputime:<2.3f}".fmt, fgWhite, bright=true), " seconds."
   202             # " ", hl($(round((get_occupied_mem()/1024/1024),1)), fgWhite, bright=true), "MB memory used."
       
   203         )
   257         )
   204 
   258 
   205 
   259 
   206 proc serverloop( conf: Config ): void =
   260 proc serverloop( conf: Config ): void =
   207     ## Open a database connection, bind to the listening socket,
   261     ## Open a database connection, bind to the listening socket,
   233         echo ""
   287         echo ""
   234 
   288 
   235     # Wait for incoming connections, fork for each client.
   289     # Wait for incoming connections, fork for each client.
   236     #
   290     #
   237     while true:
   291     while true:
   238         var
   292         let client = NetdataClient.new
   239             client  = new Socket
   293         client.sock = Socket.new
   240             address = ""
       
   241 
   294 
   242         # Block, waiting for new connections.
   295         # Block, waiting for new connections.
   243         server.acceptAddr( client, address )
   296         server.acceptAddr( client.sock, client.address )
   244 
   297 
   245         if fork() == 0:
   298         if fork() == 0:
   246             server.close
   299             server.close
   247             client.process( address )
   300             if conf.dropconn:
   248             quit( 0 )
   301                 # "one shot" mode.
   249 
   302                 client.process
   250         client.close
   303                 quit( 0 )
       
   304             else:
       
   305                 # Keep the connection to netdata open.
       
   306                 while true: client.process
       
   307 
       
   308         client.sock.close
   251         when defined( testing ): dumpNumberOfInstances()
   309         when defined( testing ): dumpNumberOfInstances()
   252 
   310 
   253 
   311 
   254 proc parse_cmdline: Config =
   312 proc parse_cmdline: Config =
   255     ## Populate the config object with the user's preferences.
   313     ## Populate the config object with the user's preferences.
   257     # Config object defaults.
   315     # Config object defaults.
   258     #
   316     #
   259     result = Config(
   317     result = Config(
   260         dbopts: "host=localhost dbname=netdata application_name=netdata-tsrelay",
   318         dbopts: "host=localhost dbname=netdata application_name=netdata-tsrelay",
   261         dbtable: "netdata",
   319         dbtable: "netdata",
       
   320         dropconn: false,
   262         listen_port: 14866,
   321         listen_port: 14866,
   263         listen_addr: "0.0.0.0",
   322         listen_addr: "0.0.0.0",
   264         verbose: true,
   323         verbose: true,
   265         debug: false,
   324         debug: false,
   266         timeout: 500,
   325         timeout: 500,
       
   326         persistent: false,
   267         insertsql: INSERT_SQL % [ "netdata" ]
   327         insertsql: INSERT_SQL % [ "netdata" ]
   268     )
   328     )
   269 
   329 
   270     # always set debug mode if development build.
   330     # always set debug mode if development build.
   271     result.debug = defined( testing )
   331     result.debug = defined( testing )
   279         of cmdLongOption, cmdShortOption:
   339         of cmdLongOption, cmdShortOption:
   280             case key
   340             case key
   281                 of "debug", "d":
   341                 of "debug", "d":
   282                     result.debug = true
   342                     result.debug = true
   283 
   343 
       
   344                 of "dropconn", "D":
       
   345                     if result.persistent:
       
   346                         echo "Dropping TCP sockets are incompatible with persistent database connections."
       
   347                         quit( 1 )
       
   348                     result.dropconn = true
       
   349 
   284                 of "help", "h":
   350                 of "help", "h":
   285                     echo USAGE
   351                     echo USAGE
   286                     quit( 0 )
   352                     quit( 0 )
   287 
   353 
   288                 of "quiet", "q":
   354                 of "quiet", "q":
   294 
   360 
   295                 of "timeout", "t": result.timeout = val.parse_int
   361                 of "timeout", "t": result.timeout = val.parse_int
   296 
   362 
   297                 of "dbtable", "T":
   363                 of "dbtable", "T":
   298                     result.insertsql = INSERT_SQL % [ val ]
   364                     result.insertsql = INSERT_SQL % [ val ]
   299                 of "dbopts": result.dbopts = val
   365                 of "dbopts", "o": result.dbopts = val
   300 
   366 
   301                 of "listen-addr", "a": result.listen_addr = val
   367                 of "listen-addr", "a": result.listen_addr = val
   302                 of "listen-port", "p": result.listen_port = val.parse_int
   368                 of "listen-port", "p": result.listen_port = val.parse_int
       
   369 
       
   370                 of "persistent", "P":
       
   371                     if result.dropconn:
       
   372                         echo "Persistent database connections are incompatible with dropping TCP sockets."
       
   373                         quit( 1 )
       
   374                     result.persistent = true
   303 
   375 
   304                 else: discard
   376                 else: discard
   305 
   377 
   306         of cmdEnd: assert( false ) # shouldn't reach here ever
   378         of cmdEnd: assert( false ) # shouldn't reach here ever
   307 
   379