|
1 # vim: set et nosta sw=4 ts=4 ft=nim : |
|
2 # |
|
3 # Copyright (c) 2018, Mahlon E. Smith <mahlon@martini.nu> |
|
4 # All rights reserved. |
|
5 # Redistribution and use in source and binary forms, with or without |
|
6 # modification, are permitted provided that the following conditions are met: |
|
7 # |
|
8 # * Redistributions of source code must retain the above copyright |
|
9 # notice, this list of conditions and the following disclaimer. |
|
10 # |
|
11 # * Redistributions in binary form must reproduce the above copyright |
|
12 # notice, this list of conditions and the following disclaimer in the |
|
13 # documentation and/or other materials provided with the distribution. |
|
14 # |
|
15 # * Neither the name of Mahlon E. Smith nor the names of his |
|
16 # contributors may be used to endorse or promote products derived |
|
17 # from this software without specific prior written permission. |
|
18 # |
|
19 # THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY |
|
20 # EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
21 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE |
|
22 # DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY |
|
23 # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES |
|
24 # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; |
|
25 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND |
|
26 # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
|
27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
|
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
|
29 |
|
30 |
|
31 import |
|
32 db_postgres, |
|
33 json, |
|
34 nativesockets, |
|
35 net, |
|
36 parseopt2, |
|
37 strutils, |
|
38 tables, |
|
39 threadpool |
|
40 |
|
41 |
|
42 const |
|
43 VERSION = "v0.1.0" |
|
44 USAGE = """ |
|
45 ./netdata_tsrelay --dbopts="[PostgreSQL connection string]" --listen-port=14866 |
|
46 |
|
47 The default connection string is: |
|
48 "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay" |
|
49 """ |
|
50 INSERT_SQL = """ |
|
51 INSERT INTO netdata |
|
52 ( time, host, metrics ) |
|
53 VALUES |
|
54 ( 'epoch'::timestamptz + ? * '1 second'::interval, ?, ? ) |
|
55 """ |
|
56 |
|
57 |
|
58 type Config = object of RootObj |
|
59 dbopts: string # The postgresql connection parameters. (See https://www.postgresql.org/docs/current/static/libpq-connect.html) |
|
60 listen_port: int # The port to listen for incoming connections |
|
61 |
|
62 # Global config object |
|
63 # |
|
64 var conf = Config( |
|
65 dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay", |
|
66 listen_port: 14866 |
|
67 ) |
|
68 |
|
69 |
|
70 proc fetch_data( client: Socket ): string = |
|
71 ## Netdata JSON backend doesn't send a length, so we read line by |
|
72 ## line and wait for stream timeout to determine a "sample". |
|
73 try: |
|
74 result = client.recv_line( timeout=500 ) & "\n" |
|
75 while result != "": |
|
76 result = result & client.recv_line( timeout=500 ) & "\n" |
|
77 except TimeoutError: |
|
78 discard |
|
79 |
|
80 |
|
81 proc parse_data( data: string ): Table[ BiggestInt, JsonNode ] = |
|
82 ## Given a raw +data+ string, parse JSON and return a table of |
|
83 ## JSON samples ready for writing, keyed by timestamp. Netdata can |
|
84 ## buffer multiple samples in one batch. |
|
85 if data == "": return |
|
86 |
|
87 # Hash of sample timeperiods to pivoted json data |
|
88 result = init_table[ BiggestInt, JsonNode ]() |
|
89 |
|
90 for sample in split_lines( data ): |
|
91 if defined( testing ): echo sample |
|
92 if sample.len == 0: continue |
|
93 |
|
94 var parsed: JsonNode |
|
95 try: |
|
96 parsed = sample.parse_json |
|
97 except JsonParsingError: |
|
98 if defined( testing ): echo "Unable to parse sample line: " & sample |
|
99 |
|
100 # Create or use existing Json object for modded data. |
|
101 # |
|
102 var pivot: JsonNode |
|
103 let key = parsed["timestamp"].get_num |
|
104 |
|
105 if result.has_key( key ): |
|
106 pivot = result[ key ] |
|
107 else: |
|
108 pivot = newJObject() |
|
109 result[ key ] = pivot |
|
110 |
|
111 var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str |
|
112 pivot[ "hostname" ] = parsed[ "hostname" ] |
|
113 pivot[ name ] = parsed[ "value" ] |
|
114 |
|
115 if defined( testing ): echo $result.len & " samples" |
|
116 |
|
117 return result |
|
118 |
|
119 |
|
120 proc process( client: Socket, db: DBConn ): void = |
|
121 ## Do the work for a connected client within a thread. |
|
122 var raw_data = client.fetch_data |
|
123 |
|
124 try: |
|
125 if defined( testing ): |
|
126 echo "Closed connection for " & get_peer_addr( client.get_fd, get_sock_domain(client.get_fd) )[0] |
|
127 client.close |
|
128 except OSError: |
|
129 return |
|
130 |
|
131 var samples = parse_data( raw_data ) |
|
132 for timestamp, sample in samples: |
|
133 var host = sample[ "hostname" ].get_str |
|
134 sample.delete( "hostname" ) |
|
135 db.exec sql( INSERT_SQL ), timestamp, host, sample |
|
136 |
|
137 proc serverloop: void = |
|
138 ## Open a database connection, bind to the listening socket, |
|
139 ## and start serving incoming netdata streams. |
|
140 let db = open( "", "", "", conf.dbopts ) |
|
141 echo "Successfully connected to the backend database." |
|
142 #SELECT TIMESTAMP WITH TIME ZONE 'epoch' + 982384720 * INTERVAL '1 second';' |
|
143 #db.close |
|
144 |
|
145 var server = newSocket() |
|
146 echo "Listening for incoming connections on port ", conf.listen_port, "..." |
|
147 server.set_sock_opt( OptReuseAddr, true ) |
|
148 server.bind_addr( Port(conf.listen_port) ) |
|
149 server.listen() |
|
150 |
|
151 while true: |
|
152 var |
|
153 client = newSocket() |
|
154 address = "" |
|
155 |
|
156 server.acceptAddr( client, address ) |
|
157 echo "New connection: " & address |
|
158 spawn client.process( db ) |
|
159 |
|
160 |
|
161 proc parse_cmdline: void = |
|
162 ## Populate the config object with the user's preferences. |
|
163 for kind, key, val in getopt(): |
|
164 case kind |
|
165 |
|
166 of cmdArgument: |
|
167 discard |
|
168 |
|
169 of cmdLongOption, cmdShortOption: |
|
170 case key |
|
171 of "help", "h": |
|
172 echo USAGE |
|
173 quit( 0 ) |
|
174 |
|
175 of "version", "v": |
|
176 echo "netdata_tsrelay ", VERSION |
|
177 quit( 0 ) |
|
178 |
|
179 of "dbopts": conf.dbopts = val |
|
180 of "listen-port", "p": conf.listen_port = val.parse_int |
|
181 |
|
182 else: discard |
|
183 |
|
184 of cmdEnd: assert( false ) # shouldn't reach here ever |
|
185 |
|
186 |
|
187 when isMainModule: |
|
188 parse_cmdline() |
|
189 if defined( testing ): echo conf |
|
190 serverloop() |
|
191 |