netdata_tsrelay.nim
changeset 16 fce5b4150c09
parent 15 ed87882bb7f0
child 17 96b8799a565a
equal deleted inserted replaced
15:ed87882bb7f0 16:fce5b4150c09
    33     json,
    33     json,
    34     math,
    34     math,
    35     nativesockets,
    35     nativesockets,
    36     net,
    36     net,
    37     os,
    37     os,
    38     parseopt2,
    38     parseopt,
    39     posix,
    39     posix,
    40     strutils,
    40     strutils,
    41     tables,
    41     tables,
    42     terminal,
    42     terminal,
    43     times
    43     times
    44 
    44 
    45 
    45 
    46 const
    46 const
    47     VERSION = "v0.1.1"
    47     VERSION = "v0.2.0"
    48     USAGE = """
    48     USAGE = """
    49 ./netdata_tsrelay [-q][-v][-h] --dbopts="[PostgreSQL connection string]" --listen-port=14866 --listen-addr=0.0.0.0
    49 ./netdata_tsrelay [-q][-v][-h] --dbopts="[PostgreSQL connection string]" --listen-port=14866 --listen-addr=0.0.0.0
    50 
    50 
    51   -q: Quiet mode.  No output at all.  Ignored if -d is supplied.
    51   -q: Quiet mode.  No output at all.  Ignored if -d is supplied.
    52   -d: Debug: Show incoming and parsed data.
    52   -d: Debug: Show incoming and parsed data.
    92 
    92 
    93 
    93 
    94 proc fetch_data( client: Socket ): string =
    94 proc fetch_data( client: Socket ): string =
    95     ## Netdata JSON backend doesn't send a length, so we read line by
    95     ## Netdata JSON backend doesn't send a length, so we read line by
    96     ## line and wait for stream timeout to determine a "sample".
    96     ## line and wait for stream timeout to determine a "sample".
    97     var buf: string = nil
    97     var buf: string = ""
    98     try:
    98     try:
    99         result = client.recv_line( timeout=conf.timeout )
    99         result = client.recv_line( timeout=conf.timeout )
   100         if result != "" and not result.is_nil: result = result & "\n"
   100         if result != "": result = result & "\n"
   101         while buf != "":
   101         while buf != "":
   102             buf = client.recv_line( timeout=conf.timeout )
   102             buf = client.recv_line( timeout=conf.timeout )
   103             if buf != "" and not buf.is_nil: result = result & buf & "\n"
   103             if buf != "": result = result & buf & "\n"
   104     except TimeoutError:
   104     except TimeoutError:
   105         discard
   105         discard
   106 
   106 
   107 
   107 
   108 proc parse_data( data: string ): seq[ JsonNode ] =
   108 proc parse_data( data: string ): seq[ JsonNode ] =
   109     ## Given a raw +data+ string, parse JSON and return a sequence
   109     ## Given a raw +data+ string, parse JSON and return a sequence
   110     ## of JSON samples. Netdata can buffer multiple samples in one batch.
   110     ## of JSON samples. Netdata can buffer multiple samples in one batch.
   111     result = @[]
   111     result = @[]
   112     if data == "" or data.is_nil: return
   112     if data == "": return
   113 
   113 
   114     # Hash of sample timeperiods to pivoted json data
   114     # Hash of sample timeperiods to pivoted json data
   115     var pivoted_data = init_table[ BiggestInt, JsonNode ]()
   115     var pivoted_data = init_table[ BiggestInt, JsonNode ]()
   116 
   116 
   117     for sample in split_lines( data ):
   117     for sample in split_lines( data ):
   118         if sample == "" or sample.is_nil: continue
   118         if sample == "": continue
   119         if conf.debug: echo sample.hl( fgBlack, bright=true )
   119         if conf.debug: echo sample.hl( fgBlack, bright=true )
   120 
   120 
   121         var parsed: JsonNode
   121         var parsed: JsonNode
   122         try:
   122         try:
   123             parsed = sample.parse_json
   123             parsed = sample.parse_json
   128 
   128 
   129         # Create or use existing Json object for modded data.
   129         # Create or use existing Json object for modded data.
   130         #
   130         #
   131         var pivot: JsonNode
   131         var pivot: JsonNode
   132         try:
   132         try:
   133             let key = parsed[ "timestamp" ].get_num
   133             let key = parsed[ "timestamp" ].get_int
   134 
   134 
   135             if pivoted_data.has_key( key ):
   135             if pivoted_data.has_key( key ):
   136                 pivot = pivoted_data[ key ]
   136                 pivot = pivoted_data[ key ]
   137             else:
   137             else:
   138                 pivot = newJObject()
   138                 pivot = newJObject()
   157 
   157 
   158     try:
   158     try:
   159         db.exec sql( "BEGIN" )
   159         db.exec sql( "BEGIN" )
   160         for sample in samples:
   160         for sample in samples:
   161             var
   161             var
   162                 timestamp = sample[ "timestamp" ].get_num
   162                 timestamp = sample[ "timestamp" ].get_int
   163                 host = sample[ "hostname" ].get_str.to_lowerascii
   163                 host = sample[ "hostname" ].get_str.to_lowerascii
   164             sample.delete( "timestamp" )
   164             sample.delete( "timestamp" )
   165             sample.delete( "hostname" )
   165             sample.delete( "hostname" )
   166             db.exec sql( conf.insertsql ), timestamp, host, sample
   166             db.exec sql( conf.insertsql ), timestamp, host, sample
   167         db.exec sql( "COMMIT" )
   167         db.exec sql( "COMMIT" )