70 dbopts: string # The postgresql connection parameters. (See https://www.postgresql.org/docs/current/static/libpq-connect.html) |
70 dbopts: string # The postgresql connection parameters. (See https://www.postgresql.org/docs/current/static/libpq-connect.html) |
71 listen_port: int # The port to listen for incoming connections |
71 listen_port: int # The port to listen for incoming connections |
72 listen_addr: string # The IP address listen for incoming connections. Defaults to inaddr_any. |
72 listen_addr: string # The IP address listen for incoming connections. Defaults to inaddr_any. |
73 verbose: bool # Be informative |
73 verbose: bool # Be informative |
74 debug: bool # Spew out raw data |
74 debug: bool # Spew out raw data |
75 use_color: bool # Pretty things up a little, probably want to disable this if debugging |
75 |
76 |
76 # Global configuration |
77 |
77 var conf: Config |
78 # The global config object |
|
79 # |
|
80 # FIXME: Rather than pass this all over the |
|
81 # place, consider channels and createThread instead of spawn. |
|
82 # |
|
83 var conf = Config( |
|
84 dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay", |
|
85 listen_port: 14866, |
|
86 listen_addr: "0.0.0.0", |
|
87 verbose: true, |
|
88 debug: false, |
|
89 use_color: true |
|
90 ) |
|
91 |
|
92 |
78 |
93 proc hl( msg: string, fg: ForegroundColor, bright=false ): string = |
79 proc hl( msg: string, fg: ForegroundColor, bright=false ): string = |
94 ## Quick wrapper for color formatting a string, since the 'terminal' |
80 ## Quick wrapper for color formatting a string, since the 'terminal' |
95 ## module only deals with stdout directly. |
81 ## module only deals with stdout directly. |
96 if not conf.use_color: return msg |
82 if not isatty(stdout): return msg |
97 |
83 |
98 var color: BiggestInt = ord( fg ) |
84 var color: BiggestInt = ord( fg ) |
99 if bright: inc( color, 60 ) |
85 if bright: inc( color, 60 ) |
100 result = "\e[" & $color & 'm' & msg & "\e[0m" |
86 result = "\e[" & $color & 'm' & msg & "\e[0m" |
101 |
87 |
102 |
88 |
103 proc fetch_data( client: Socket ): string = |
89 proc fetch_data( client: Socket ): string = |
104 ## Netdata JSON backend doesn't send a length, so we read line by |
90 ## Netdata JSON backend doesn't send a length, so we read line by |
105 ## line and wait for stream timeout to determine a "sample". |
91 ## line and wait for stream timeout to determine a "sample". |
|
92 var buf: string = nil |
106 try: |
93 try: |
107 result = client.recv_line( timeout=500 ) & "\n" |
94 result = client.recv_line( timeout=500 ) |
108 while result != "": |
95 if result != "" and not result.is_nil: result = result & "\n" |
109 result = result & client.recv_line( timeout=500 ) & "\n" |
96 while buf != "": |
|
97 buf = client.recv_line( timeout=500 ) |
|
98 if buf != "" and not buf.is_nil: result = result & buf & "\n" |
110 except TimeoutError: |
99 except TimeoutError: |
111 discard |
100 discard |
112 |
101 |
113 |
102 |
114 proc parse_data( data: string, conf: Config ): Table[ BiggestInt, JsonNode ] = |
103 proc parse_data( data: string ): seq[ JsonNode ] = |
115 ## Given a raw +data+ string, parse JSON and return a table of |
104 ## Given a raw +data+ string, parse JSON and return a sequence |
116 ## JSON samples ready for writing, keyed by timestamp. Netdata can |
105 ## of JSON samples. Netdata can buffer multiple samples in one batch. |
117 ## buffer multiple samples in one batch. |
106 result = @[] |
118 if data == "": return |
107 if data == "" or data.is_nil: return |
119 |
108 |
120 # Hash of sample timeperiods to pivoted json data |
109 # Hash of sample timeperiods to pivoted json data |
121 result = init_table[ BiggestInt, JsonNode ]() |
110 var pivoted_data = init_table[ BiggestInt, JsonNode ]() |
122 |
111 |
123 for sample in split_lines( data ): |
112 for sample in split_lines( data ): |
124 if conf.debug: echo sample.hl( fgBlack, bright=true ) |
113 if sample == "" or sample.is_nil: continue |
125 if sample.len == 0: continue |
114 #if conf.debug: echo sample.hl( fgBlack, bright=true ) |
126 |
115 |
127 var parsed: JsonNode |
116 var parsed: JsonNode |
128 try: |
117 try: |
129 parsed = sample.parse_json |
118 parsed = sample.parse_json |
130 except JsonParsingError: |
119 except JsonParsingError: |
131 discard |
|
132 if conf.debug: echo hl( "Unable to parse sample line: " & sample.hl(fgRed, bright=true), fgRed ) |
120 if conf.debug: echo hl( "Unable to parse sample line: " & sample.hl(fgRed, bright=true), fgRed ) |
|
121 continue |
|
122 if parsed.kind != JObject: return |
133 |
123 |
134 # Create or use existing Json object for modded data. |
124 # Create or use existing Json object for modded data. |
135 # |
125 # |
136 var pivot: JsonNode |
126 var pivot: JsonNode |
137 let key = parsed["timestamp"].get_num |
127 try: |
138 |
128 let key = parsed["timestamp"].get_num |
139 if result.has_key( key ): |
129 |
140 pivot = result[ key ] |
130 if pivoted_data.has_key( key ): |
141 else: |
131 pivot = pivoted_data[ key ] |
142 pivot = newJObject() |
132 else: |
143 result[ key ] = pivot |
133 pivot = newJObject() |
144 |
134 pivoted_data[ key ] = pivot |
145 var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str |
135 |
146 pivot[ "hostname" ] = parsed[ "hostname" ] |
136 var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str |
147 pivot[ name ] = parsed[ "value" ] |
137 pivot[ "hostname" ] = parsed[ "hostname" ] |
148 |
138 pivot[ "timestamp" ] = parsed[ "timestamp" ] |
149 return result |
139 pivot[ name ] = parsed[ "value" ] |
150 |
140 except: |
151 |
141 continue |
152 proc process( client: Socket, db: DBConn, conf: Config ): int = |
142 |
|
143 for timestamp, sample in pivoted_data: |
|
144 result.add( sample ) |
|
145 |
|
146 |
|
147 proc write_to_database( samples: seq[ JsonNode ] ): void = |
|
148 ## Given a sequence of json samples, write them to database. |
|
149 if samples.len == 0: return |
|
150 |
|
151 let db = open( "", "", "", conf.dbopts ) |
|
152 |
|
153 try: |
|
154 db.exec sql( "BEGIN" ) |
|
155 for sample in samples: |
|
156 var |
|
157 timestamp = sample[ "timestamp" ].get_num |
|
158 host = sample[ "hostname" ].get_str |
|
159 sample.delete( "timestamp" ) |
|
160 sample.delete( "hostname" ) |
|
161 db.exec sql( INSERT_SQL ), timestamp, host, sample |
|
162 db.exec sql( "COMMIT" ) |
|
163 except: |
|
164 let |
|
165 e = getCurrentException() |
|
166 msg = getCurrentExceptionMsg() |
|
167 echo "Got exception ", repr(e), " while writing to DB: ", msg |
|
168 discard |
|
169 |
|
170 db.close |
|
171 |
|
172 |
|
173 proc process( client: Socket, address: string ): void = |
153 ## Do the work for a connected client within a thread. |
174 ## Do the work for a connected client within a thread. |
154 ## Returns the number of samples parsed. |
175 ## Returns the formatted json data keyed on sample time. |
|
176 let t0 = cpu_time() |
155 var raw_data = client.fetch_data |
177 var raw_data = client.fetch_data |
156 |
178 |
157 # Done with the socket, netdata will automatically |
179 # Done with the socket, netdata will automatically |
158 # reconnect. Save local resources/file descriptors |
180 # reconnect. Save local resources/file descriptors |
159 # by closing after the send is considered complete. |
181 # by closing after the send is considered complete. |
161 try: |
183 try: |
162 client.close |
184 client.close |
163 except OSError: |
185 except OSError: |
164 return |
186 return |
165 |
187 |
166 # Pivot data and save to SQL. |
188 # Pivot the parsed data to a single JSON blob per sample time. |
167 # |
189 var samples = parse_data( raw_data ) |
168 var samples = parse_data( raw_data, conf ) |
190 write_to_database( samples ) |
169 if samples.len != 0: |
|
170 db.exec sql( "BEGIN" ) |
|
171 for timestamp, sample in samples: |
|
172 var host = sample[ "hostname" ].get_str |
|
173 sample.delete( "hostname" ) |
|
174 db.exec sql( INSERT_SQL ), timestamp, host, sample |
|
175 db.exec sql( "COMMIT" ) |
|
176 |
|
177 return samples.len |
|
178 |
|
179 |
|
180 proc runthread( client: Socket, address: string, db: DBConn, conf: Config ): void {.thread.} = |
|
181 ## A thread that performs that dispatches processing and returns |
|
182 ## results. |
|
183 let t0 = cpu_time() |
|
184 var samples = client.process( db, conf ) |
|
185 |
191 |
186 if conf.verbose: |
192 if conf.verbose: |
187 echo( |
193 echo( |
188 hl( $samples, fgWhite, bright=true ), |
194 hl( $(epochTime().to_int), fgMagenta, bright=true ), |
|
195 " ", |
|
196 hl( $(samples.len), fgWhite, bright=true ), |
189 " sample(s) parsed from ", |
197 " sample(s) parsed from ", |
190 address.hl( fgYellow, bright=true ), |
198 address.hl( fgYellow, bright=true ), |
191 " in ", hl($( round(cpu_time() - t0, 3) ), fgWhite, bright=true), " seconds." |
199 " in ", hl($( round(cpu_time() - t0, 3) ), fgWhite, bright=true), " seconds." |
192 # " ", hl($(round((get_occupied_mem()/1024/1024),1)), fgWhite, bright=true), "MB memory used." |
200 # " ", hl($(round((get_occupied_mem()/1024/1024),1)), fgWhite, bright=true), "MB memory used." |
193 ) |
201 ) |
194 |
202 |
195 |
203 |
196 proc serverloop: void = |
204 proc serverloop( conf: Config ): void = |
197 ## Open a database connection, bind to the listening socket, |
205 ## Open a database connection, bind to the listening socket, |
198 ## and start serving incoming netdata streams. |
206 ## and start serving incoming netdata streams. |
199 let db = open( "", "", "", conf.dbopts ) |
207 let db = open( "", "", "", conf.dbopts ) |
200 if conf.verbose: echo( "Successfully connected to the backend database.".hl( fgGreen ) ) |
208 if conf.verbose: echo( "Successfully tested connection to the backend database.".hl( fgGreen ) ) |
201 |
209 db.close |
202 var |
210 |
203 conn_count = 0 |
211 # Setup listening socket. |
204 server = newSocket() |
212 # |
205 |
213 var server = newSocket() |
206 server.set_sock_opt( OptReuseAddr, true ) |
214 server.set_sock_opt( OptReuseAddr, true ) |
207 server.bind_addr( Port(conf.listen_port), conf.listen_addr ) |
215 server.bind_addr( Port(conf.listen_port), conf.listen_addr ) |
208 server.listen() |
216 server.listen() |
209 |
217 |
210 if conf.verbose: |
218 if conf.verbose: |
214 ":", |
222 ":", |
215 hl( $conf.listen_port, fgBlue, bright=true ), |
223 hl( $conf.listen_port, fgBlue, bright=true ), |
216 ) |
224 ) |
217 echo "" |
225 echo "" |
218 |
226 |
|
227 # Wait for incoming connections, fork for each client. |
|
228 # |
219 while true: |
229 while true: |
220 var client = newSocket() |
230 var |
221 var address = "" |
231 client = new Socket |
222 |
232 address = "" |
223 # Force a garbage collection pass. |
233 status: cint = 0 |
224 # |
234 |
225 conn_count = conn_count + 1 |
235 server.acceptAddr( client, address ) # block |
226 if conn_count == 25: |
236 |
227 when defined( testing ): echo "Forcing GC pass." |
237 if fork() == 0: |
228 GC_full_collect() |
238 server.close |
229 conn_count = 1 |
239 client.process( address ) |
|
240 quit( 0 ) |
230 |
241 |
231 client.close |
242 client.close |
232 server.acceptAddr( client, address ) # blocking call |
243 |
233 spawn runthread( client, address, db, conf ) |
244 discard waitpid( P_ALL, status, WNOHANG ) # reap all previous children |
234 when defined( testing ): dumpNumberOfInstances() |
245 when defined( testing ): dumpNumberOfInstances() |
235 |
246 |
236 |
247 |
237 proc parse_cmdline: void = |
248 proc parse_cmdline: Config = |
238 ## Populate the config object with the user's preferences. |
249 ## Populate the config object with the user's preferences. |
239 |
250 |
|
251 # Config object defaults. |
|
252 # |
|
253 result = Config( |
|
254 dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay", |
|
255 listen_port: 14866, |
|
256 listen_addr: "0.0.0.0", |
|
257 verbose: true, |
|
258 debug: false |
|
259 ) |
|
260 |
240 # always set debug mode if development build. |
261 # always set debug mode if development build. |
241 conf.debug = defined( testing ) |
262 result.debug = defined( testing ) |
242 |
263 |
243 for kind, key, val in getopt(): |
264 for kind, key, val in getopt(): |
244 case kind |
265 case kind |
245 |
266 |
246 of cmdArgument: |
267 of cmdArgument: |
247 discard |
268 discard |
248 |
269 |
249 of cmdLongOption, cmdShortOption: |
270 of cmdLongOption, cmdShortOption: |
250 case key |
271 case key |
251 of "debug", "d": |
272 of "debug", "d": |
252 conf.debug = true |
273 result.debug = true |
253 |
|
254 of "no-color", "c": |
|
255 conf.use_color = false |
|
256 |
274 |
257 of "help", "h": |
275 of "help", "h": |
258 echo USAGE |
276 echo USAGE |
259 quit( 0 ) |
277 quit( 0 ) |
260 |
278 |
261 of "quiet", "q": |
279 of "quiet", "q": |
262 conf.verbose = false |
280 result.verbose = false |
263 |
281 |
264 of "version", "v": |
282 of "version", "v": |
265 echo hl( "netdata_tsrelay " & VERSION, fgWhite, bright=true ) |
283 echo hl( "netdata_tsrelay " & VERSION, fgWhite, bright=true ) |
266 quit( 0 ) |
284 quit( 0 ) |
267 |
285 |
268 of "dbopts": conf.dbopts = val |
286 of "dbopts": result.dbopts = val |
269 of "listen-addr", "a": conf.listen_addr = val |
287 of "listen-addr", "a": result.listen_addr = val |
270 of "listen-port", "p": conf.listen_port = val.parse_int |
288 of "listen-port", "p": result.listen_port = val.parse_int |
271 |
289 |
272 else: discard |
290 else: discard |
273 |
291 |
274 of cmdEnd: assert( false ) # shouldn't reach here ever |
292 of cmdEnd: assert( false ) # shouldn't reach here ever |
275 |
293 |
276 |
294 |
277 when isMainModule: |
295 when isMainModule: |
278 system.addQuitProc( resetAttributes ) |
296 system.addQuitProc( resetAttributes ) |
279 |
297 |
280 parse_cmdline() |
298 conf = parse_cmdline() |
281 if conf.debug: echo hl( $conf, fgYellow ) |
299 if conf.debug: echo hl( $conf, fgYellow ) |
282 |
300 |
283 serverloop() |
301 serverloop( conf ) |
284 |
302 |