nsq-to-gs
Stream an NSQ channel to Google Cloud Storage
Based on nsq-to-s3 by chrusty
Written (more like adjusted) by Eran Sandler (@erans) http://eran.sandler.co.il
Parameters
- topic: The NSQ topic to subscribe to
- channel: An NSQ channel name to use (defaults to an automatically-generated ephemeral channel)
- max-in-flight: The maximum number of unFinished messages to allow (effectively a flush-batch size)
- max-in-flight-time: The maximum number of seconds to wait before flushing (in case maxInFlight is not enough)
- lookupd-http-address: The address of an NSQLookup daemon to connect to
- nsqd-tcp-address: A specific NSQ daemon to connect to
- bucket-seconds: The time-bucket-size of each file you want to end up with on GS, if we don't hit bucketMessages first (eg 3600 will give you one file on GS per-hour)
- bucket-messages: Total number of messages to bucket (if bucketSeconds doesn't elapse first)
- gsbucket: The GS bucket to store the files on (eg "nsq-archive")
- gspath: A path to store the archive files under (eg "/live-dumps")
- gsfileprefix: The generate file name prefix (eg "mylogfile" which would be mylogfile-20151117_1003.json.gz)
- batchmode: Which mode to run in [memory, disk, channel]
- bufferfile: The name of a file to use as a local on-disk buffer between flushes to GS (should be something durable)
- extension: Extension for files on GS (default is json)
Modes (current)
NSQ-to-GS can operate in several different modes, depending on your storage and/or durability requirements:
"Batch-on-disk"
- Subs to NSQ
- De-dupes in memory (map[string][bool] where string is a hash of the message payload)
- Once max-in-flight is reached it flushes messages to disk then Finish()es them
- After timeBucket has elapsed it stops consuming, sticks the file on GS, clears the de-dupe map and continues
- You would be well-advised to use some kind of persistent storage
"In-memory"
As with batch-on-disk but all messages are kept in-memory between flushes to GS. If you stop the process then you will lose messages!
Modes (planned)
"Abandoned-channel"
- Subs to NSQ (creates a channel)
- Waits for timeBucket to elapse
- Pauses the channel
- Takes all the messages off the queue, de-dupes in memory, sticks them on GS
- Finish()es the messages
- Unpauses the channel
- Repeat
"Continuous-sync-to-gs"
- As with batch-on-disk but syncs to GS every x seconds
- Either overwrites the same file on GS, or piles up new ones
- At the end of the time-bucket the interim files are removed from GS
Examples
Consuming a topic, buffering on disk, flushing in-flight at 1000 messages, flushing to GS every 5 minutes:
nsq-to-gs -gsproject=myproject -gsbucket=nsq-archive -topic=firehose -channel='nsq-to-gs#ephemeral' -lookupd-http-address=10.0.0.2:4161 -gspath=/live-dumps/firehose -bucket-seconds=300 -max-in-flight=1000 -batchmode=disk
Bugs (current)
- Dupes can still occur around flush boundaries
- The timer for flushing to GS is based on events arriving (not on absolute time). This means that he filenames/numbers will creep (just being pedantic)
- Should optionally compress files for GS