32 db_postgres, |
32 db_postgres, |
33 json, |
33 json, |
34 math, |
34 math, |
35 nativesockets, |
35 nativesockets, |
36 net, |
36 net, |
37 os, |
|
38 parseopt, |
37 parseopt, |
39 posix, |
38 posix, |
40 strutils, |
39 strutils, |
|
40 strformat, |
41 tables, |
41 tables, |
42 terminal, |
42 terminal, |
43 times |
43 times |
44 |
44 |
45 |
45 |
46 const |
46 const |
47 VERSION = "v0.2.0" |
47 VERSION = "v0.3.0" |
48 USAGE = """ |
48 USAGE = """ |
49 ./netdata_tsrelay [-d][-h][-q][-t][-T][-v] --dbopts="[PostgreSQL connection string]" --listen-port=14866 --listen-addr=0.0.0.0 |
49 ./netdata_tsrelay [-adDhopqtTv] |
50 |
50 |
51 -q: Quiet mode. No output at all. Ignored if -d is supplied. |
51 -a --listen-addr: |
52 -d: Debug: Show incoming and parsed data. |
52 The outbound IP address to listen for netdata streams. |
53 -v: Display version number. |
53 |
54 -T: Change the destination table name from the default 'netdata'. |
54 -d --debug: |
55 -t: Alter the maximum time (in ms) an open socket waits for data. Default: 500ms. |
55 Debug: Show incoming and parsed data. |
56 -h: Help. You're lookin' at it. |
56 |
57 |
57 -D --dropconn: |
58 The default connection string is: |
58 Drop the persistent socket to netdata between samples to conserve |
59 "host=localhost dbname=netdata application_name=netdata-tsrelay" |
59 local resources. This may be helpful with a large number of clients. |
|
60 Defaults to false. |
|
61 |
|
62 -h --help: |
|
63 Help. You're lookin' at it. |
|
64 |
|
65 -o --dbopts: |
|
66 The PostgreSQL connection string parameters. |
|
67 The default connection string is: |
|
68 "host=localhost dbname=netdata application_name=netdata-tsrelay" |
|
69 |
|
70 -p --listen-port: |
|
71 Change the listening port from the default (14866). |
|
72 |
|
73 -P --persistent: |
|
74 Don't disconnect from the database between samples. This may be |
|
75 more efficient with a small number of clients, when not using a |
|
76 pooler, or with a very high sample size/rate. Defaults to false. |
|
77 |
|
78 -q --quiet: |
|
79 Quiet mode. No output at all. Ignored if -d is supplied. |
|
80 |
|
81 -T --dbtable: |
|
82 Change the destination table name from the default (netdata). |
|
83 |
|
84 -t --timeout: |
|
85 Alter the maximum time (in ms) an open socket waits for data |
|
86 before processing the sample. Default: 500ms. |
|
87 |
|
88 -v --verbose: |
|
89 Display version number. |
|
90 |
60 """ |
91 """ |
61 INSERT_SQL = """ |
92 INSERT_SQL = """ |
62 INSERT INTO $1 |
93 INSERT INTO $1 |
63 ( time, host, metrics ) |
94 ( time, host, metrics ) |
64 VALUES |
95 VALUES |
68 |
99 |
69 type |
100 type |
70 Config = object of RootObj |
101 Config = object of RootObj |
71 dbopts: string # The postgresql connection parameters. (See https://www.postgresql.org/docs/current/static/libpq-connect.html) |
102 dbopts: string # The postgresql connection parameters. (See https://www.postgresql.org/docs/current/static/libpq-connect.html) |
72 dbtable: string # The name of the table to write to. |
103 dbtable: string # The name of the table to write to. |
|
104 dropconn: bool # Close the TCP connection between samples. |
|
105 persistent: bool # Don't close the database handle between samples. |
73 listen_port: int # The port to listen for incoming connections. |
106 listen_port: int # The port to listen for incoming connections. |
74 listen_addr: string # The IP address listen for incoming connections. Defaults to inaddr_any. |
107 listen_addr: string # The IP address listen for incoming connections. Defaults to inaddr_any. |
75 verbose: bool # Be informative |
108 verbose: bool # Be informative |
76 debug: bool # Spew out raw data |
109 debug: bool # Spew out raw data |
77 insertsql: string # The SQL insert string after interpolating the table name. |
110 insertsql: string # The SQL insert string after interpolating the table name. |
78 timeout: int # How long to block, waiting on connection data. |
111 timeout: int # How long to block, waiting on connection data. |
79 |
112 |
|
113 |
|
114 type |
|
115 NetdataClient = ref object |
|
116 sock: Socket # The raw socket fd |
|
117 address: string # The remote IP address |
|
118 db: DbConn # An optionally persistent database handle |
|
119 |
|
120 |
80 # Global configuration |
121 # Global configuration |
81 var conf: Config |
122 var conf: Config |
82 |
123 |
83 |
124 |
84 proc hl( msg: string, fg: ForegroundColor, bright=false ): string = |
125 proc hl( msg: string, fg: ForegroundColor, bright=false ): string = |
89 var color: BiggestInt = ord( fg ) |
130 var color: BiggestInt = ord( fg ) |
90 if bright: inc( color, 60 ) |
131 if bright: inc( color, 60 ) |
91 result = "\e[" & $color & 'm' & msg & "\e[0m" |
132 result = "\e[" & $color & 'm' & msg & "\e[0m" |
92 |
133 |
93 |
134 |
94 proc fetch_data( client: Socket ): string = |
135 proc fetch_data( client: NetdataClient ): string = |
95 ## Netdata JSON backend doesn't send a length, so we read line by |
136 ## Netdata JSON backend doesn't send a length nor a separator |
96 ## line and wait for stream timeout to determine a "sample". |
137 ## between samples, so we read line by line and wait for stream |
|
138 ## timeout to determine what constitutes a sample. |
97 var buf = "" |
139 var buf = "" |
98 try: |
140 while true: |
99 while true: |
141 try: |
100 client.readline( buf, timeout=conf.timeout ) |
142 client.sock.readline( buf, timeout=conf.timeout ) |
101 if buf != "": result = result & buf & "\n" |
143 if buf == "": |
102 except TimeoutError: |
144 if conf.debug: echo "Client {client.address} closed socket.".fmt.hl( fgRed, bright=true ) |
103 return |
145 quit( 1 ) |
|
146 |
|
147 result = result & buf & "\n" |
|
148 |
|
149 except OSError: |
|
150 quit( 1 ) |
|
151 except TimeoutError: |
|
152 if result == "": continue |
|
153 return |
104 |
154 |
105 |
155 |
106 proc parse_data( data: string ): seq[ JsonNode ] = |
156 proc parse_data( data: string ): seq[ JsonNode ] = |
107 ## Given a raw +data+ string, parse JSON and return a sequence |
157 ## Given a raw +data+ string, parse JSON and return a sequence |
108 ## of JSON samples. Netdata can buffer multiple samples in one batch. |
158 ## of JSON samples. Netdata can buffer multiple samples in one batch. |
145 |
195 |
146 for timestamp, sample in pivoted_data: |
196 for timestamp, sample in pivoted_data: |
147 result.add( sample ) |
197 result.add( sample ) |
148 |
198 |
149 |
199 |
150 proc write_to_database( samples: seq[ JsonNode ] ): void = |
200 proc write_to_database( client: NetdataClient, samples: seq[ JsonNode ] ): void = |
151 ## Given a sequence of json samples, write them to database. |
201 ## Given a sequence of json samples, write them to database. |
152 if samples.len == 0: return |
202 if samples.len == 0: return |
153 |
203 |
154 let db = open( "", "", "", conf.dbopts ) |
204 if client.db.isNil: |
|
205 client.db = open( "", "", "", conf.dbopts ) |
155 |
206 |
156 try: |
207 try: |
157 db.exec sql( "BEGIN" ) |
208 client.db.exec sql( "BEGIN" ) |
158 for sample in samples: |
209 for sample in samples: |
159 var |
210 var |
160 timestamp = sample[ "timestamp" ].get_int |
211 timestamp = sample[ "timestamp" ].get_int |
161 host = sample[ "hostname" ].get_str.to_lowerascii |
212 host = sample[ "hostname" ].get_str.to_lowerascii |
162 sample.delete( "timestamp" ) |
213 sample.delete( "timestamp" ) |
163 sample.delete( "hostname" ) |
214 sample.delete( "hostname" ) |
164 db.exec sql( conf.insertsql ), timestamp, host, sample |
215 client.db.exec sql( conf.insertsql ), timestamp, host, sample |
165 db.exec sql( "COMMIT" ) |
216 client.db.exec sql( "COMMIT" ) |
166 except: |
217 except: |
167 let |
218 let |
168 e = getCurrentException() |
219 e = getCurrentException() |
169 msg = getCurrentExceptionMsg() |
220 msg = getCurrentExceptionMsg() |
170 echo "Got exception ", repr(e), " while writing to DB: ", msg |
221 echo "Got exception ", repr(e), " while writing to DB: ", msg |
171 discard |
222 discard |
172 |
223 |
173 db.close |
224 if not conf.persistent: |
174 |
225 client.db.close |
175 |
226 client.db = nil |
176 proc process( client: Socket, address: string ): void = |
227 |
|
228 |
|
229 proc process( client: NetdataClient ): void = |
177 ## Do the work for a connected client within child process. |
230 ## Do the work for a connected client within child process. |
178 let t0 = cpu_time() |
231 let t0 = cpu_time() |
179 var raw_data = client.fetch_data |
232 var raw_data = client.fetch_data |
180 |
233 |
181 # Done with the socket, netdata will automatically |
234 # Done with the socket, netdata will automatically |
182 # reconnect. Save local resources/file descriptors |
235 # reconnect. Save local resources/file descriptors |
183 # by closing after the send is considered complete. |
236 # by closing after the send is considered complete. |
184 # |
237 # |
185 try: |
238 if conf.dropconn: |
186 client.close |
239 try: |
187 except OSError: |
240 client.sock.close |
188 return |
241 except OSError: |
|
242 return |
189 |
243 |
190 # Pivot the parsed data to a single JSON blob per sample time. |
244 # Pivot the parsed data to a single JSON blob per sample time. |
191 var samples = parse_data( raw_data ) |
245 var samples = parse_data( raw_data ) |
192 write_to_database( samples ) |
246 client.write_to_database( samples ) |
193 |
247 |
194 if conf.verbose: |
248 if conf.verbose: |
|
249 let cputime = cpu_time() - t0 |
195 echo( |
250 echo( |
196 hl( $(epochTime().to_int), fgMagenta, bright=true ), |
251 hl( $(epochTime().to_int), fgMagenta, bright=true ), |
197 " ", |
252 " ", |
198 hl( $(samples.len), fgWhite, bright=true ), |
253 hl( $(samples.len), fgWhite, bright=true ), |
199 " sample(s) parsed from ", |
254 " sample(s) parsed from ", |
200 address.hl( fgYellow, bright=true ), |
255 client.address.hl( fgYellow, bright=true ), |
201 " in ", hl($( round(cpu_time() - t0, 3) ), fgWhite, bright=true), " seconds." |
256 " in ", hl( "{cputime:<2.3f}".fmt, fgWhite, bright=true), " seconds." |
202 # " ", hl($(round((get_occupied_mem()/1024/1024),1)), fgWhite, bright=true), "MB memory used." |
|
203 ) |
257 ) |
204 |
258 |
205 |
259 |
206 proc serverloop( conf: Config ): void = |
260 proc serverloop( conf: Config ): void = |
207 ## Open a database connection, bind to the listening socket, |
261 ## Open a database connection, bind to the listening socket, |
233 echo "" |
287 echo "" |
234 |
288 |
235 # Wait for incoming connections, fork for each client. |
289 # Wait for incoming connections, fork for each client. |
236 # |
290 # |
237 while true: |
291 while true: |
238 var |
292 let client = NetdataClient.new |
239 client = new Socket |
293 client.sock = Socket.new |
240 address = "" |
|
241 |
294 |
242 # Block, waiting for new connections. |
295 # Block, waiting for new connections. |
243 server.acceptAddr( client, address ) |
296 server.acceptAddr( client.sock, client.address ) |
244 |
297 |
245 if fork() == 0: |
298 if fork() == 0: |
246 server.close |
299 server.close |
247 client.process( address ) |
300 if conf.dropconn: |
248 quit( 0 ) |
301 # "one shot" mode. |
249 |
302 client.process |
250 client.close |
303 quit( 0 ) |
|
304 else: |
|
305 # Keep the connection to netdata open. |
|
306 while true: client.process |
|
307 |
|
308 client.sock.close |
251 when defined( testing ): dumpNumberOfInstances() |
309 when defined( testing ): dumpNumberOfInstances() |
252 |
310 |
253 |
311 |
254 proc parse_cmdline: Config = |
312 proc parse_cmdline: Config = |
255 ## Populate the config object with the user's preferences. |
313 ## Populate the config object with the user's preferences. |