netdata_tsrelay.nim
changeset 11 475c9942eb15
parent 9 aa9d537f7067
child 13 e1777929ba15
equal deleted inserted replaced
10:252cdb26f76b 11:475c9942eb15
    49 ./netdata_tsrelay [-q][-v][-h] --dbopts="[PostgreSQL connection string]" --listen-port=14866 --listen-addr=0.0.0.0
    49 ./netdata_tsrelay [-q][-v][-h] --dbopts="[PostgreSQL connection string]" --listen-port=14866 --listen-addr=0.0.0.0
    50 
    50 
    51   -q: Quiet mode.  No output at all.  Ignored if -d is supplied.
    51   -q: Quiet mode.  No output at all.  Ignored if -d is supplied.
    52   -d: Debug: Show incoming and parsed data.
    52   -d: Debug: Show incoming and parsed data.
    53   -v: Display version number.
    53   -v: Display version number.
       
    54   -T: Change the destination table name from the default 'netdata'.
       
    55   -t: Alter the maximum time (in ms) an open socket waits for data.  Default: 500ms.
    54   -h: Help.  You're lookin' at it.
    56   -h: Help.  You're lookin' at it.
    55 
    57 
    56 The default connection string is:
    58 The default connection string is:
    57   "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay"
    59   "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay"
    58     """
    60     """
    59     INSERT_SQL = """
    61     INSERT_SQL = """
    60     INSERT INTO netdata
    62     INSERT INTO $1
    61         ( time, host, metrics )
    63         ( time, host, metrics )
    62     VALUES
    64     VALUES
    63         ( 'epoch'::timestamptz + ? * '1 second'::interval, ?, ? )
    65         ( 'epoch'::timestamptz + ? * '1 second'::interval, ?, ? )
    64     """
    66     """
    65 
    67 
    66 
    68 
    67 type
    69 type
    68     Config = object of RootObj
    70     Config = object of RootObj
    69         dbopts:      string  # The postgresql connection parameters.  (See https://www.postgresql.org/docs/current/static/libpq-connect.html)
    71         dbopts:      string  # The postgresql connection parameters.  (See https://www.postgresql.org/docs/current/static/libpq-connect.html)
    70         listen_port: int     # The port to listen for incoming connections
    72         dbtable:     string  # The name of the table to write to.
       
    73         listen_port: int     # The port to listen for incoming connections.
    71         listen_addr: string  # The IP address listen for incoming connections.  Defaults to inaddr_any.
    74         listen_addr: string  # The IP address listen for incoming connections.  Defaults to inaddr_any.
    72         verbose:     bool    # Be informative
    75         verbose:     bool    # Be informative
    73         debug:       bool    # Spew out raw data
    76         debug:       bool    # Spew out raw data
       
    77         insertsql:   string  # The SQL insert string after interpolating the table name.
       
    78         timeout:     int     # How long to block, waiting on connection data.
    74 
    79 
    75 # Global configuration
    80 # Global configuration
    76 var conf: Config
    81 var conf: Config
    77 
    82 
    78 
    83 
    89 proc fetch_data( client: Socket ): string =
    94 proc fetch_data( client: Socket ): string =
    90     ## Netdata JSON backend doesn't send a length, so we read line by
    95     ## Netdata JSON backend doesn't send a length, so we read line by
    91     ## line and wait for stream timeout to determine a "sample".
    96     ## line and wait for stream timeout to determine a "sample".
    92     var buf: string = nil
    97     var buf: string = nil
    93     try:
    98     try:
    94         result = client.recv_line( timeout=500 )
    99         result = client.recv_line( timeout=conf.timeout )
    95         if result != "" and not result.is_nil: result = result & "\n"
   100         if result != "" and not result.is_nil: result = result & "\n"
    96         while buf != "":
   101         while buf != "":
    97             buf = client.recv_line( timeout=500 )
   102             buf = client.recv_line( timeout=conf.timeout )
    98             if buf != "" and not buf.is_nil: result = result & buf & "\n"
   103             if buf != "" and not buf.is_nil: result = result & buf & "\n"
    99     except TimeoutError:
   104     except TimeoutError:
   100         discard
   105         discard
   101 
   106 
   102 
   107 
   109     # Hash of sample timeperiods to pivoted json data
   114     # Hash of sample timeperiods to pivoted json data
   110     var pivoted_data = init_table[ BiggestInt, JsonNode ]()
   115     var pivoted_data = init_table[ BiggestInt, JsonNode ]()
   111 
   116 
   112     for sample in split_lines( data ):
   117     for sample in split_lines( data ):
   113         if sample == "" or sample.is_nil: continue
   118         if sample == "" or sample.is_nil: continue
   114         #if conf.debug: echo sample.hl( fgBlack, bright=true )
   119         if conf.debug: echo sample.hl( fgBlack, bright=true )
   115 
   120 
   116         var parsed: JsonNode
   121         var parsed: JsonNode
   117         try:
   122         try:
   118             parsed = sample.parse_json
   123             parsed = sample.parse_json
   119         except JsonParsingError:
   124         except JsonParsingError:
   156             var
   161             var
   157                 timestamp = sample[ "timestamp" ].get_num
   162                 timestamp = sample[ "timestamp" ].get_num
   158                 host = sample[ "hostname" ].get_str
   163                 host = sample[ "hostname" ].get_str
   159             sample.delete( "timestamp" )
   164             sample.delete( "timestamp" )
   160             sample.delete( "hostname" )
   165             sample.delete( "hostname" )
   161             db.exec sql( INSERT_SQL ), timestamp, host, sample
   166             db.exec sql( conf.insertsql ), timestamp, host, sample
   162         db.exec sql( "COMMIT" )
   167         db.exec sql( "COMMIT" )
   163     except:
   168     except:
   164         let
   169         let
   165             e = getCurrentException()
   170             e = getCurrentException()
   166             msg = getCurrentExceptionMsg()
   171             msg = getCurrentExceptionMsg()
   253 
   258 
   254     # Config object defaults.
   259     # Config object defaults.
   255     #
   260     #
   256     result = Config(
   261     result = Config(
   257         dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay",
   262         dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay",
       
   263         dbtable: "netdata",
   258         listen_port: 14866,
   264         listen_port: 14866,
   259         listen_addr: "0.0.0.0",
   265         listen_addr: "0.0.0.0",
   260         verbose: true,
   266         verbose: true,
   261         debug: false
   267         debug: false,
       
   268         timeout: 500,
       
   269         insertsql: INSERT_SQL % [ "netdata" ]
   262     )
   270     )
   263 
   271 
   264     # always set debug mode if development build.
   272     # always set debug mode if development build.
   265     result.debug = defined( testing )
   273     result.debug = defined( testing )
   266 
   274 
   279                     echo USAGE
   287                     echo USAGE
   280                     quit( 0 )
   288                     quit( 0 )
   281 
   289 
   282                 of "quiet", "q":
   290                 of "quiet", "q":
   283                     result.verbose = false
   291                     result.verbose = false
   284             
   292 
   285                 of "version", "v":
   293                 of "version", "v":
   286                     echo hl( "netdata_tsrelay " & VERSION, fgWhite, bright=true )
   294                     echo hl( "netdata_tsrelay " & VERSION, fgWhite, bright=true )
   287                     quit( 0 )
   295                     quit( 0 )
   288                
   296 
       
   297                 of "timeout", "t": result.timeout = val.parse_int
       
   298 
       
   299                 of "dbtable", "T":
       
   300                     result.insertsql = INSERT_SQL % [ val ]
   289                 of "dbopts": result.dbopts = val
   301                 of "dbopts": result.dbopts = val
       
   302 
   290                 of "listen-addr", "a": result.listen_addr = val
   303                 of "listen-addr", "a": result.listen_addr = val
   291                 of "listen-port", "p": result.listen_port = val.parse_int
   304                 of "listen-port", "p": result.listen_port = val.parse_int
   292 
   305 
   293                 else: discard
   306                 else: discard
   294 
   307