netdata_tsrelay.nim
changeset 5 a1276c3d39eb
parent 4 f3d83bdd7877
child 6 1f366fc61592
equal deleted inserted replaced
4:f3d83bdd7877 5:a1276c3d39eb
    29 
    29 
    30 
    30 
    31 import
    31 import
    32     db_postgres,
    32     db_postgres,
    33     json,
    33     json,
       
    34     math,
    34     nativesockets,
    35     nativesockets,
    35     net,
    36     net,
       
    37     os,
    36     parseopt2,
    38     parseopt2,
    37     strutils,
    39     strutils,
    38     tables,
    40     tables,
       
    41     terminal,
       
    42     times,
    39     threadpool
    43     threadpool
    40 
    44 
    41 
    45 
    42 const
    46 const
    43     VERSION = "v0.1.0"
    47     VERSION = "v0.1.0"
    44     USAGE = """
    48     USAGE = """
    45 ./netdata_tsrelay --dbopts="[PostgreSQL connection string]" --listen-port=14866
    49 ./netdata_tsrelay [-q][-v][-h] --dbopts="[PostgreSQL connection string]" --listen-port=14866 --listen-addr=0.0.0.0
       
    50 
       
    51   -q: Quiet mode.  No output at all.  Ignored if -d is supplied.
       
    52   -c: Suppress ANSI color output.
       
    53   -d: Debug: Show incoming and parsed data.
       
    54   -v: Display version number.
       
    55   -h: Help.  You're lookin' at it.
    46 
    56 
    47 The default connection string is:
    57 The default connection string is:
    48   "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay"
    58   "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay"
    49     """
    59     """
    50     INSERT_SQL = """
    60     INSERT_SQL = """
    53     VALUES
    63     VALUES
    54         ( 'epoch'::timestamptz + ? * '1 second'::interval, ?, ? )
    64         ( 'epoch'::timestamptz + ? * '1 second'::interval, ?, ? )
    55     """
    65     """
    56 
    66 
    57 
    67 
    58 type Config = object of RootObj
    68 type
    59     dbopts:      string  # The postgresql connection parameters.  (See https://www.postgresql.org/docs/current/static/libpq-connect.html)
    69     Config = object of RootObj
    60     listen_port: int     # The port to listen for incoming connections
    70         dbopts:      string  # The postgresql connection parameters.  (See https://www.postgresql.org/docs/current/static/libpq-connect.html)
    61 
    71         listen_port: int     # The port to listen for incoming connections
    62 # Global config object
    72         listen_addr: string  # The IP address listen for incoming connections.  Defaults to inaddr_any.
       
    73         verbose:     bool    # Be informative
       
    74         debug:       bool    # Spew out raw data
       
    75         use_color:   bool    # Pretty things up a little, probably want to disable this if debugging
       
    76 
       
    77 
       
    78 # The global config object
       
    79 #
       
    80 # FIXME:  Rather than pass this all over the
       
    81 # place, consider channels and createThread instead of spawn.
    63 #
    82 #
    64 var conf = Config(
    83 var conf = Config(
    65     dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay",
    84     dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay",
    66     listen_port: 14866
    85     listen_port: 14866,
       
    86     listen_addr: "0.0.0.0",
       
    87     verbose: true,
       
    88     debug: false,
       
    89     use_color: true
    67 )
    90 )
       
    91 
       
    92 
       
    93 proc hl( msg: string, fg: ForegroundColor, bright=false ): string =
       
    94     ## Quick wrapper for color formatting a string, since the 'terminal'
       
    95     ## module only deals with stdout directly.
       
    96     if not conf.use_color: return msg
       
    97 
       
    98     var color: BiggestInt = ord( fg )
       
    99     if bright: inc( color, 60 )
       
   100     result = "\e[" & $color & 'm' & msg & "\e[0m"
    68 
   101 
    69 
   102 
    70 proc fetch_data( client: Socket ): string =
   103 proc fetch_data( client: Socket ): string =
    71     ## Netdata JSON backend doesn't send a length, so we read line by
   104     ## Netdata JSON backend doesn't send a length, so we read line by
    72     ## line and wait for stream timeout to determine a "sample".
   105     ## line and wait for stream timeout to determine a "sample".
    76             result = result & client.recv_line( timeout=500 ) & "\n"
   109             result = result & client.recv_line( timeout=500 ) & "\n"
    77     except TimeoutError:
   110     except TimeoutError:
    78         discard
   111         discard
    79 
   112 
    80 
   113 
    81 proc parse_data( data: string ): Table[ BiggestInt, JsonNode ] =
   114 proc parse_data( data: string, conf: Config ): Table[ BiggestInt, JsonNode ] =
    82     ## Given a raw +data+ string, parse JSON and return a table of
   115     ## Given a raw +data+ string, parse JSON and return a table of
    83     ## JSON samples ready for writing, keyed by timestamp. Netdata can
   116     ## JSON samples ready for writing, keyed by timestamp. Netdata can
    84     ## buffer multiple samples in one batch.
   117     ## buffer multiple samples in one batch.
    85     if data == "": return
   118     if data == "": return
    86 
   119 
    87     # Hash of sample timeperiods to pivoted json data
   120     # Hash of sample timeperiods to pivoted json data
    88     result = init_table[ BiggestInt, JsonNode ]()
   121     result = init_table[ BiggestInt, JsonNode ]()
    89 
   122 
    90     for sample in split_lines( data ):
   123     for sample in split_lines( data ):
    91         if defined( testing ): echo sample
   124         if conf.debug: echo sample.hl( fgBlack, bright=true )
    92         if sample.len == 0: continue
   125         if sample.len == 0: continue
    93 
   126 
    94         var parsed: JsonNode
   127         var parsed: JsonNode
    95         try:
   128         try:
    96             parsed = sample.parse_json
   129             parsed = sample.parse_json
    97         except JsonParsingError:
   130         except JsonParsingError:
    98             if defined( testing ): echo "Unable to parse sample line: " & sample
   131             discard
       
   132             if conf.debug: echo hl( "Unable to parse sample line: " & sample.hl(fgRed, bright=true), fgRed )
    99 
   133 
   100         # Create or use existing Json object for modded data.
   134         # Create or use existing Json object for modded data.
   101         #
   135         #
   102         var pivot: JsonNode
   136         var pivot: JsonNode
   103         let key = parsed["timestamp"].get_num
   137         let key = parsed["timestamp"].get_num
   110 
   144 
   111         var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str
   145         var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str
   112         pivot[ "hostname" ] = parsed[ "hostname" ]
   146         pivot[ "hostname" ] = parsed[ "hostname" ]
   113         pivot[ name ] = parsed[ "value" ]
   147         pivot[ name ] = parsed[ "value" ]
   114 
   148 
   115     if defined( testing ): echo $result.len & " samples"
       
   116 
       
   117     return result
   149     return result
   118 
   150 
   119 
   151 
   120 proc process( client: Socket, db: DBConn ): void =
   152 proc process( client: Socket, db: DBConn, conf: Config ): int =
   121     ## Do the work for a connected client within a thread.
   153     ## Do the work for a connected client within a thread.
       
   154     ## Returns the number of samples parsed.
   122     var raw_data = client.fetch_data
   155     var raw_data = client.fetch_data
   123 
   156 
       
   157     # Done with the socket, netdata will automatically
       
   158     # reconnect.  Save local resources/file descriptors
       
   159     # by closing after the send is considered complete.
       
   160     #
   124     try:
   161     try:
   125         if defined( testing ):
       
   126             echo "Closed connection for " & get_peer_addr( client.get_fd, get_sock_domain(client.get_fd) )[0]
       
   127         client.close
   162         client.close
   128     except OSError:
   163     except OSError:
   129         return
   164         return
   130 
   165 
   131     var samples = parse_data( raw_data )
   166     # Pivot data and save to SQL.
   132     if samples.len == 0: return
   167     #
   133 
   168     var samples = parse_data( raw_data, conf )
   134     db.exec sql( "BEGIN" )
   169     if samples.len != 0:
   135     for timestamp, sample in samples:
   170         db.exec sql( "BEGIN" )
   136         var host = sample[ "hostname" ].get_str
   171         for timestamp, sample in samples:
   137         sample.delete( "hostname" )
   172             var host = sample[ "hostname" ].get_str
   138         db.exec sql( INSERT_SQL ), timestamp, host, sample
   173             sample.delete( "hostname" )
   139     db.exec sql( "COMMIT" )
   174             db.exec sql( INSERT_SQL ), timestamp, host, sample
       
   175         db.exec sql( "COMMIT" )
       
   176 
       
   177     return samples.len
       
   178 
       
   179 
       
   180 proc runthread( client: Socket, address: string, db: DBConn, conf: Config ): void {.thread.} =
       
   181     ## A thread that performs that dispatches processing and returns
       
   182     ## results.
       
   183     let t0 = cpu_time()
       
   184     var samples = client.process( db, conf )
       
   185 
       
   186     if conf.verbose:
       
   187         echo(
       
   188             hl( $samples, fgWhite, bright=true ),
       
   189             " sample(s) parsed from ",
       
   190             address.hl( fgYellow, bright=true ),
       
   191             " in ", hl($( round(cpu_time() - t0, 3) ), fgWhite, bright=true), " seconds."
       
   192             # " ", hl($(round((get_occupied_mem()/1024/1024),1)), fgWhite, bright=true), "MB memory used."
       
   193         )
       
   194     when defined( testing ): dumpNumberOfInstances()
       
   195 
   140 
   196 
   141 proc serverloop: void =
   197 proc serverloop: void =
   142     ## Open a database connection, bind to the listening socket,
   198     ## Open a database connection, bind to the listening socket,
   143     ## and start serving incoming netdata streams.
   199     ## and start serving incoming netdata streams.
   144     let db = open( "", "", "", conf.dbopts )
   200     let db = open( "", "", "", conf.dbopts )
   145     echo "Successfully connected to the backend database."
   201     if conf.verbose: echo( "Successfully connected to the backend database.".hl( fgGreen ) )
   146     
   202 
   147     var server = newSocket()
   203     var
   148     echo "Listening for incoming connections on port ", conf.listen_port, "..."
   204         server = newSocket()
       
   205         client = newSocket()
       
   206 
   149     server.set_sock_opt( OptReuseAddr, true )
   207     server.set_sock_opt( OptReuseAddr, true )
   150     server.bind_addr( Port(conf.listen_port) )
   208     server.bind_addr( Port(conf.listen_port), conf.listen_addr )
   151     server.listen()
   209     server.listen()
   152 
   210 
       
   211     if conf.verbose:
       
   212         echo(
       
   213             "Listening for incoming connections on ".hl( fgGreen, bright=true ),
       
   214             hl( (if conf.listen_addr == "0.0.0.0": "*" else: conf.listen_addr) , fgBlue, bright=true ),
       
   215             ":",
       
   216             hl( $conf.listen_port, fgBlue, bright=true ),
       
   217         )
       
   218         echo ""
       
   219 
   153     while true:
   220     while true:
   154         var
   221         var address = ""
   155             client  = newSocket()
   222         server.acceptAddr( client, address ) # blocking call
   156             address = ""
   223         spawn runthread( client, address, db, conf )
   157 
   224 
   158         server.acceptAddr( client, address )
   225 
   159         echo "New connection: " & address
   226 proc atexit() {.noconv.} =
   160         spawn client.process( db )
   227     ## Exit cleanly after waiting on any running threads.
       
   228     echo "Exiting..."
       
   229     sync()
       
   230     quit( 0 )
   161 
   231 
   162 
   232 
   163 proc parse_cmdline: void =
   233 proc parse_cmdline: void =
   164     ## Populate the config object with the user's preferences.
   234     ## Populate the config object with the user's preferences.
       
   235 
       
   236     # always set debug mode if development build.
       
   237     conf.debug = defined( testing )
       
   238 
   165     for kind, key, val in getopt():
   239     for kind, key, val in getopt():
   166         case kind
   240         case kind
   167 
   241 
   168         of cmdArgument:
   242         of cmdArgument:
   169             discard
   243             discard
   170 
   244 
   171         of cmdLongOption, cmdShortOption:
   245         of cmdLongOption, cmdShortOption:
   172             case key
   246             case key
       
   247                 of "debug", "d":
       
   248                     conf.debug = true
       
   249 
       
   250                 of "no-color", "c":
       
   251                     conf.use_color = false
       
   252 
   173                 of "help", "h":
   253                 of "help", "h":
   174                     echo USAGE
   254                     echo USAGE
   175                     quit( 0 )
   255                     quit( 0 )
       
   256 
       
   257                 of "quiet", "q":
       
   258                     conf.verbose = false
   176             
   259             
   177                 of "version", "v":
   260                 of "version", "v":
   178                     echo "netdata_tsrelay ", VERSION
   261                     echo hl( "netdata_tsrelay " & VERSION, fgWhite, bright=true )
   179                     quit( 0 )
   262                     quit( 0 )
   180                
   263                
   181                 of "dbopts": conf.dbopts = val
   264                 of "dbopts": conf.dbopts = val
       
   265                 of "listen-addr", "a": conf.listen_addr = val
   182                 of "listen-port", "p": conf.listen_port = val.parse_int
   266                 of "listen-port", "p": conf.listen_port = val.parse_int
   183 
   267 
   184                 else: discard
   268                 else: discard
   185 
   269 
   186         of cmdEnd: assert( false ) # shouldn't reach here ever
   270         of cmdEnd: assert( false ) # shouldn't reach here ever
   187 
   271 
   188 
   272 
   189 when isMainModule:
   273 when isMainModule:
       
   274     system.addQuitProc( resetAttributes )
       
   275     system.addQuitProc( atexit )
       
   276 
   190     parse_cmdline()
   277     parse_cmdline()
   191     if defined( testing ): echo conf
   278 
       
   279     if conf.debug: echo hl( $conf, fgYellow )
   192     serverloop()
   280     serverloop()
   193 
   281