53 VALUES |
63 VALUES |
54 ( 'epoch'::timestamptz + ? * '1 second'::interval, ?, ? ) |
64 ( 'epoch'::timestamptz + ? * '1 second'::interval, ?, ? ) |
55 """ |
65 """ |
56 |
66 |
57 |
67 |
58 type Config = object of RootObj |
68 type |
59 dbopts: string # The postgresql connection parameters. (See https://www.postgresql.org/docs/current/static/libpq-connect.html) |
69 Config = object of RootObj |
60 listen_port: int # The port to listen for incoming connections |
70 dbopts: string # The postgresql connection parameters. (See https://www.postgresql.org/docs/current/static/libpq-connect.html) |
61 |
71 listen_port: int # The port to listen for incoming connections |
62 # Global config object |
72 listen_addr: string # The IP address listen for incoming connections. Defaults to inaddr_any. |
|
73 verbose: bool # Be informative |
|
74 debug: bool # Spew out raw data |
|
75 use_color: bool # Pretty things up a little, probably want to disable this if debugging |
|
76 |
|
77 |
|
78 # The global config object |
|
79 # |
|
80 # FIXME: Rather than pass this all over the |
|
81 # place, consider channels and createThread instead of spawn. |
63 # |
82 # |
64 var conf = Config( |
83 var conf = Config( |
65 dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay", |
84 dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay", |
66 listen_port: 14866 |
85 listen_port: 14866, |
|
86 listen_addr: "0.0.0.0", |
|
87 verbose: true, |
|
88 debug: false, |
|
89 use_color: true |
67 ) |
90 ) |
|
91 |
|
92 |
|
93 proc hl( msg: string, fg: ForegroundColor, bright=false ): string = |
|
94 ## Quick wrapper for color formatting a string, since the 'terminal' |
|
95 ## module only deals with stdout directly. |
|
96 if not conf.use_color: return msg |
|
97 |
|
98 var color: BiggestInt = ord( fg ) |
|
99 if bright: inc( color, 60 ) |
|
100 result = "\e[" & $color & 'm' & msg & "\e[0m" |
68 |
101 |
69 |
102 |
70 proc fetch_data( client: Socket ): string = |
103 proc fetch_data( client: Socket ): string = |
71 ## Netdata JSON backend doesn't send a length, so we read line by |
104 ## Netdata JSON backend doesn't send a length, so we read line by |
72 ## line and wait for stream timeout to determine a "sample". |
105 ## line and wait for stream timeout to determine a "sample". |
76 result = result & client.recv_line( timeout=500 ) & "\n" |
109 result = result & client.recv_line( timeout=500 ) & "\n" |
77 except TimeoutError: |
110 except TimeoutError: |
78 discard |
111 discard |
79 |
112 |
80 |
113 |
81 proc parse_data( data: string ): Table[ BiggestInt, JsonNode ] = |
114 proc parse_data( data: string, conf: Config ): Table[ BiggestInt, JsonNode ] = |
82 ## Given a raw +data+ string, parse JSON and return a table of |
115 ## Given a raw +data+ string, parse JSON and return a table of |
83 ## JSON samples ready for writing, keyed by timestamp. Netdata can |
116 ## JSON samples ready for writing, keyed by timestamp. Netdata can |
84 ## buffer multiple samples in one batch. |
117 ## buffer multiple samples in one batch. |
85 if data == "": return |
118 if data == "": return |
86 |
119 |
87 # Hash of sample timeperiods to pivoted json data |
120 # Hash of sample timeperiods to pivoted json data |
88 result = init_table[ BiggestInt, JsonNode ]() |
121 result = init_table[ BiggestInt, JsonNode ]() |
89 |
122 |
90 for sample in split_lines( data ): |
123 for sample in split_lines( data ): |
91 if defined( testing ): echo sample |
124 if conf.debug: echo sample.hl( fgBlack, bright=true ) |
92 if sample.len == 0: continue |
125 if sample.len == 0: continue |
93 |
126 |
94 var parsed: JsonNode |
127 var parsed: JsonNode |
95 try: |
128 try: |
96 parsed = sample.parse_json |
129 parsed = sample.parse_json |
97 except JsonParsingError: |
130 except JsonParsingError: |
98 if defined( testing ): echo "Unable to parse sample line: " & sample |
131 discard |
|
132 if conf.debug: echo hl( "Unable to parse sample line: " & sample.hl(fgRed, bright=true), fgRed ) |
99 |
133 |
100 # Create or use existing Json object for modded data. |
134 # Create or use existing Json object for modded data. |
101 # |
135 # |
102 var pivot: JsonNode |
136 var pivot: JsonNode |
103 let key = parsed["timestamp"].get_num |
137 let key = parsed["timestamp"].get_num |
110 |
144 |
111 var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str |
145 var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str |
112 pivot[ "hostname" ] = parsed[ "hostname" ] |
146 pivot[ "hostname" ] = parsed[ "hostname" ] |
113 pivot[ name ] = parsed[ "value" ] |
147 pivot[ name ] = parsed[ "value" ] |
114 |
148 |
115 if defined( testing ): echo $result.len & " samples" |
|
116 |
|
117 return result |
149 return result |
118 |
150 |
119 |
151 |
120 proc process( client: Socket, db: DBConn ): void = |
152 proc process( client: Socket, db: DBConn, conf: Config ): int = |
121 ## Do the work for a connected client within a thread. |
153 ## Do the work for a connected client within a thread. |
|
154 ## Returns the number of samples parsed. |
122 var raw_data = client.fetch_data |
155 var raw_data = client.fetch_data |
123 |
156 |
|
157 # Done with the socket, netdata will automatically |
|
158 # reconnect. Save local resources/file descriptors |
|
159 # by closing after the send is considered complete. |
|
160 # |
124 try: |
161 try: |
125 if defined( testing ): |
|
126 echo "Closed connection for " & get_peer_addr( client.get_fd, get_sock_domain(client.get_fd) )[0] |
|
127 client.close |
162 client.close |
128 except OSError: |
163 except OSError: |
129 return |
164 return |
130 |
165 |
131 var samples = parse_data( raw_data ) |
166 # Pivot data and save to SQL. |
132 if samples.len == 0: return |
167 # |
133 |
168 var samples = parse_data( raw_data, conf ) |
134 db.exec sql( "BEGIN" ) |
169 if samples.len != 0: |
135 for timestamp, sample in samples: |
170 db.exec sql( "BEGIN" ) |
136 var host = sample[ "hostname" ].get_str |
171 for timestamp, sample in samples: |
137 sample.delete( "hostname" ) |
172 var host = sample[ "hostname" ].get_str |
138 db.exec sql( INSERT_SQL ), timestamp, host, sample |
173 sample.delete( "hostname" ) |
139 db.exec sql( "COMMIT" ) |
174 db.exec sql( INSERT_SQL ), timestamp, host, sample |
|
175 db.exec sql( "COMMIT" ) |
|
176 |
|
177 return samples.len |
|
178 |
|
179 |
|
180 proc runthread( client: Socket, address: string, db: DBConn, conf: Config ): void {.thread.} = |
|
181 ## A thread that performs that dispatches processing and returns |
|
182 ## results. |
|
183 let t0 = cpu_time() |
|
184 var samples = client.process( db, conf ) |
|
185 |
|
186 if conf.verbose: |
|
187 echo( |
|
188 hl( $samples, fgWhite, bright=true ), |
|
189 " sample(s) parsed from ", |
|
190 address.hl( fgYellow, bright=true ), |
|
191 " in ", hl($( round(cpu_time() - t0, 3) ), fgWhite, bright=true), " seconds." |
|
192 # " ", hl($(round((get_occupied_mem()/1024/1024),1)), fgWhite, bright=true), "MB memory used." |
|
193 ) |
|
194 when defined( testing ): dumpNumberOfInstances() |
|
195 |
140 |
196 |
141 proc serverloop: void = |
197 proc serverloop: void = |
142 ## Open a database connection, bind to the listening socket, |
198 ## Open a database connection, bind to the listening socket, |
143 ## and start serving incoming netdata streams. |
199 ## and start serving incoming netdata streams. |
144 let db = open( "", "", "", conf.dbopts ) |
200 let db = open( "", "", "", conf.dbopts ) |
145 echo "Successfully connected to the backend database." |
201 if conf.verbose: echo( "Successfully connected to the backend database.".hl( fgGreen ) ) |
146 |
202 |
147 var server = newSocket() |
203 var |
148 echo "Listening for incoming connections on port ", conf.listen_port, "..." |
204 server = newSocket() |
|
205 client = newSocket() |
|
206 |
149 server.set_sock_opt( OptReuseAddr, true ) |
207 server.set_sock_opt( OptReuseAddr, true ) |
150 server.bind_addr( Port(conf.listen_port) ) |
208 server.bind_addr( Port(conf.listen_port), conf.listen_addr ) |
151 server.listen() |
209 server.listen() |
152 |
210 |
|
211 if conf.verbose: |
|
212 echo( |
|
213 "Listening for incoming connections on ".hl( fgGreen, bright=true ), |
|
214 hl( (if conf.listen_addr == "0.0.0.0": "*" else: conf.listen_addr) , fgBlue, bright=true ), |
|
215 ":", |
|
216 hl( $conf.listen_port, fgBlue, bright=true ), |
|
217 ) |
|
218 echo "" |
|
219 |
153 while true: |
220 while true: |
154 var |
221 var address = "" |
155 client = newSocket() |
222 server.acceptAddr( client, address ) # blocking call |
156 address = "" |
223 spawn runthread( client, address, db, conf ) |
157 |
224 |
158 server.acceptAddr( client, address ) |
225 |
159 echo "New connection: " & address |
226 proc atexit() {.noconv.} = |
160 spawn client.process( db ) |
227 ## Exit cleanly after waiting on any running threads. |
|
228 echo "Exiting..." |
|
229 sync() |
|
230 quit( 0 ) |
161 |
231 |
162 |
232 |
163 proc parse_cmdline: void = |
233 proc parse_cmdline: void = |
164 ## Populate the config object with the user's preferences. |
234 ## Populate the config object with the user's preferences. |
|
235 |
|
236 # always set debug mode if development build. |
|
237 conf.debug = defined( testing ) |
|
238 |
165 for kind, key, val in getopt(): |
239 for kind, key, val in getopt(): |
166 case kind |
240 case kind |
167 |
241 |
168 of cmdArgument: |
242 of cmdArgument: |
169 discard |
243 discard |
170 |
244 |
171 of cmdLongOption, cmdShortOption: |
245 of cmdLongOption, cmdShortOption: |
172 case key |
246 case key |
|
247 of "debug", "d": |
|
248 conf.debug = true |
|
249 |
|
250 of "no-color", "c": |
|
251 conf.use_color = false |
|
252 |
173 of "help", "h": |
253 of "help", "h": |
174 echo USAGE |
254 echo USAGE |
175 quit( 0 ) |
255 quit( 0 ) |
|
256 |
|
257 of "quiet", "q": |
|
258 conf.verbose = false |
176 |
259 |
177 of "version", "v": |
260 of "version", "v": |
178 echo "netdata_tsrelay ", VERSION |
261 echo hl( "netdata_tsrelay " & VERSION, fgWhite, bright=true ) |
179 quit( 0 ) |
262 quit( 0 ) |
180 |
263 |
181 of "dbopts": conf.dbopts = val |
264 of "dbopts": conf.dbopts = val |
|
265 of "listen-addr", "a": conf.listen_addr = val |
182 of "listen-port", "p": conf.listen_port = val.parse_int |
266 of "listen-port", "p": conf.listen_port = val.parse_int |
183 |
267 |
184 else: discard |
268 else: discard |
185 |
269 |
186 of cmdEnd: assert( false ) # shouldn't reach here ever |
270 of cmdEnd: assert( false ) # shouldn't reach here ever |
187 |
271 |
188 |
272 |
189 when isMainModule: |
273 when isMainModule: |
|
274 system.addQuitProc( resetAttributes ) |
|
275 system.addQuitProc( atexit ) |
|
276 |
190 parse_cmdline() |
277 parse_cmdline() |
191 if defined( testing ): echo conf |
278 |
|
279 if conf.debug: echo hl( $conf, fgYellow ) |
192 serverloop() |
280 serverloop() |
193 |
281 |