equal
deleted
inserted
replaced
48 ## This library has been tested with recent versions of RabbitMQ. If it |
48 ## This library has been tested with recent versions of RabbitMQ. If it |
49 ## works for you with another broker, please let the author know. |
49 ## works for you with another broker, please let the author know. |
50 ## |
50 ## |
51 |
51 |
52 import |
52 import |
53 strutils, |
53 std/nativesockets, |
54 nativesockets, |
54 std/net, |
55 net, |
55 std/os, |
56 os, |
56 std/strutils, |
57 times, |
57 std/times, |
58 uri |
58 std/uri |
59 |
59 |
60 const |
60 const |
61 VERSION = "0.1.3" ## The current program version. |
61 VERSION = "0.1.3" ## The current program version. |
62 NULL = "\x00" ## The NULL character. |
62 NULL = "\x00" ## The NULL character. |
63 CR = "\r" ## The carriage return character. |
63 CR = "\r" ## The carriage return character. |
128 result = 0 |
128 result = 0 |
129 var line = "" |
129 var line = "" |
130 |
130 |
131 c.socket.readline( line, c.timeout ) |
131 c.socket.readline( line, c.timeout ) |
132 while not line.is_eol: |
132 while not line.is_eol: |
133 if defined( debug ): printf " <-- %s\n", line |
133 if defined( debug ): printf " <-- %s\n", cstring(line) |
134 var header = line.split( ":" ) |
134 var header = line.split( ":" ) |
135 if header.len < 2: break |
135 if header.len < 2: break |
136 response.headers.add( (header[0], header[1]) ) |
136 response.headers.add( (header[0], header[1]) ) |
137 if cmpIgnoreCase( header[0], "content-length" ) == 0: result = header[1].parse_int |
137 if cmpIgnoreCase( header[0], "content-length" ) == 0: result = header[1].parse_int |
138 c.socket.readline( line, c.timeout ) |
138 c.socket.readline( line, c.timeout ) |
204 |
204 |
205 # All other types. |
205 # All other types. |
206 # |
206 # |
207 result.frame = line |
207 result.frame = line |
208 if defined( debug ): |
208 if defined( debug ): |
209 printf " <-- %s\n", line |
209 printf " <-- %s\n", cstring(line) |
210 |
210 |
211 # Parse headers and body. |
211 # Parse headers and body. |
212 # |
212 # |
213 var length = result.parse_headers( c ) |
213 var length = result.parse_headers( c ) |
214 if result.frame == "MESSAGE" or result.frame == "ERROR": |
214 if result.frame == "MESSAGE" or result.frame == "ERROR": |
317 case opt[0]: |
317 case opt[0]: |
318 of "heartbeat": |
318 of "heartbeat": |
319 result.options.heartbeat = opt[1].parse_int |
319 result.options.heartbeat = opt[1].parse_int |
320 else: |
320 else: |
321 discard |
321 discard |
322 except IndexError, ValueError: |
322 except IndexDefect, ValueError: |
323 discard |
323 discard |
324 |
324 |
325 # Set default STOMP port if otherwise unset. |
325 # Set default STOMP port if otherwise unset. |
326 # |
326 # |
327 if not result.uri.scheme.contains( "stomp" ): |
327 if not result.uri.scheme.contains( "stomp" ): |
761 var heartbeats = 0 |
761 var heartbeats = 0 |
762 echo "* Waiting on messages from publisher. Compile with -d:debug to see the Stomp conversation." |
762 echo "* Waiting on messages from publisher. Compile with -d:debug to see the Stomp conversation." |
763 |
763 |
764 proc receive_message( c: StompClient, r: StompResponse ) = |
764 proc receive_message( c: StompClient, r: StompResponse ) = |
765 messages.add( r ) |
765 messages.add( r ) |
766 case r.frame: |
|
767 of "RECEIPT": |
|
768 discard |
|
769 of "MESSAGE": |
|
770 discard r.payload |
|
771 discard r[ "ack" ] |
|
772 |
766 |
773 proc seen_heartbeat( c: StompClient, r: StompResponse ) = |
767 proc seen_heartbeat( c: StompClient, r: StompResponse ) = |
774 heartbeats = heartbeats + 1 |
768 heartbeats = heartbeats + 1 |
775 |
769 |
776 stomp.message_callback = receive_message |
770 stomp.message_callback = receive_message |