Initialize the connection, creating the Redis command methods, and setting the default connection options and callbacks.
# File lib/sensu/redis/client.rb, line 16 def initialize(options={}) create_command_methods! @host = options[:host] || "127.0.0.1" @port = (options[:port] || 6379).to_i @db = options[:db] @password = options[:password] @auto_reconnect = options.fetch(:auto_reconnect, true) @reconnect_on_error = options.fetch(:reconnect_on_error, true) @error_callback = Proc.new {} @reconnect_callbacks = { :before => Proc.new {}, :after => Proc.new {} } end
Set the connection after reconnect callback. This callback is called after a successful reconnect, after the connection has been validated.
# File lib/sensu/redis/client.rb, line 63 def after_reconnect(&block) @reconnect_callbacks[:after] = block end
Authenticate to Redis if a password has been set in the connection options. This method uses `send_command()` directly, as it assumes that the connection has been established. Redis authentication must be done prior to issuing other Redis commands.
@yield the callback called once authenticated.
# File lib/sensu/redis/client.rb, line 230 def authenticate if @password send_command(AUTH_COMMAND, @password) do |authenticated| if authenticated yield if block_given? else error(ConnectionError, "redis authenticate failed") end end else yield if block_given? end end
Set the connection before reconnect callback. This callback is called after the connection closes but before a reconnect is attempted.
# File lib/sensu/redis/client.rb, line 56 def before_reconnect(&block) @reconnect_callbacks[:before] = block end
Begin a multi bulk response array for an expected number of responses. Using this method causes `dispatch_response()` to wait until all of the expected responses have been added to the array, before the Redis command reponse callback is called.
@param multibulk_count [Integer] number of expected responses.
# File lib/sensu/redis/client.rb, line 290 def begin_multibulk(multibulk_count) @multibulk_count = multibulk_count @multibulk_values = [] end
Determine if the connection is connected to Redis.
# File lib/sensu/redis/client.rb, line 83 def connected? @connected || false end
This method is called by EM when the connection is established. This method is reponsible for validating the connection before Redis commands can be sent.
# File lib/sensu/redis/client.rb, line 269 def connection_completed @response_callbacks = [] @multibulk_count = false @connected = true authenticate do select_db verify_version do succeed @reconnect_callbacks[:after].call if @reconnecting @reconnecting = false end end end
Create Redis command methods. Command methods wrap `redis_command()`. This method is called by `initialize()`.
# File lib/sensu/redis/client.rb, line 183 def create_command_methods! COMMANDS.each do |command| self.class.send(:define_method, command.to_sym) do |*arguments, &block| redis_command(command, *arguments, &block) end end end
Determine the current Redis master address. If Sentinel was used to determine the original address, use it again. If Sentinel is not being used, return the host and port used when the connection was first established.
@yield callback called when the current Redis master host and
port has been determined.
# File lib/sensu/redis/client.rb, line 38 def determine_address(&block) if @sentinel @sentinel.resolve(&block) else block.call(@host, @port) end end
Dispatch a Redis error, dropping the associated Redis command response callback, and passing a Redis error object to the error callback (if set).
@param code [String] Redis error code.
# File lib/sensu/redis/client.rb, line 300 def dispatch_error(code) @response_callbacks.shift error(CommandError, code) end
Dispatch a response. If a multi bulk response has begun, this method will build the completed response array before the associated Redis command response callback is called. If one or more pubsub callbacks are defined, the approprate pubsub callbacks are called, provided with the pubsub response. Redis command response callbacks may have an optional processor block, responsible for producing a value with the correct type, e.g. “1” -> true (boolean).
@param value [Object]
# File lib/sensu/redis/client.rb, line 315 def dispatch_response(value) if @multibulk_count @multibulk_values << value @multibulk_count -= 1 if @multibulk_count == 0 value = @multibulk_values @multibulk_count = false else return end end if @pubsub_callbacks && value.is_a?(Array) if PUBSUB_RESPONSES.include?(value[0]) @pubsub_callbacks[value[1]].each do |block| block.call(*value) if block end return end end processor, block = @response_callbacks.shift if block value = processor.call(value) if processor block.call(value) end end
Create an error and pass it to the connection error callback. This method will close the current connection and trigger a reconnect (via `unbind()`) if `@reconnect_on_error` is `true`. Closing the connection here is necessary to stop EventMachine from reusing the same connection handler (we want a fresh Redis connection).
@param klass [Class] @param message [String]
# File lib/sensu/redis/client.rb, line 76 def error(klass, message) redis_error = klass.new(message) @error_callback.call(redis_error) close_connection if @reconnect_on_error end
Set the connection error callback. This callback is called when the connection encounters either a connection, protocol, or command error.
# File lib/sensu/redis/client.rb, line 49 def on_error(&block) @error_callback = block end
Parse a RESP line. This method is called by `receive_data()`. You can read about RESP @ redis.io/topics/protocol
@param line [String]
# File lib/sensu/redis/client.rb, line 345 def parse_response_line(line) # Trim off the response type and delimiter (\r\n). response = line.slice(1..-3) # First character indicates response type. case line[0, 1] when MINUS # Error, e.g. -ERR dispatch_error(response) when PLUS # String, e.g. +OK dispatch_response(response) when DOLLAR # Bulk string, e.g. $3\r\nfoo\r\n response_length = Integer(response) if response_length == -1 # No data, return nil. dispatch_response(nil) elsif @buffer.length >= response_length + 2 # Complete data. dispatch_response(@buffer.slice!(0, response_length)) @buffer.slice!(0,2) # Discard delimeter (\r\n). else # Incomplete, have data pushed back into buffer. return INCOMPLETE end when COLON # Integer, e.g. :8 dispatch_response(Integer(response)) when ASTERISK # Array, e.g. *2\r\n$3\r\foo\r\n$3\r\nbar\r\n multibulk_count = Integer(response) if multibulk_count == -1 || multibulk_count == 0 # No data, return []. dispatch_response([]) else begin_multibulk(multibulk_count) # Accumulate responses. end else error(ProtocolError, "response type not recognized: #{line.strip}") end end
This method is called by EM when the connection receives data. This method assumes that the incoming data is using RESP and it is parsed by `parse_resp_line()`.
@param data [String]
# File lib/sensu/redis/client.rb, line 383 def receive_data(data) (@buffer ||= '') << data while index = @buffer.index(DELIM) line = @buffer.slice!(0, index+2) if parse_response_line(line) == INCOMPLETE @buffer[0...0] = line break end end end
Reconnect to Redis. The before reconnect callback is first called if not already reconnecting. This method uses a 1 second delay before attempting a reconnect. The method `determine_address()` is used to determine the correct host and port to reconnect to, either via Sentinel (new master) or the previous host and port. This method uses `resolve_host()` to first resolve the determined host, if it's not already an IP address. Resolving the hostname upfront guards against lookup failures that would cause the Sensu process to crash. Upfront hostname resolution also allows this Redis library to work with Amazon AWS ElastiCache & where DNS is used as a failover mechanism.
# File lib/sensu/redis/client.rb, line 99 def reconnect! @reconnect_callbacks[:before].call unless @reconnecting @reconnecting = true EM.add_timer(1) do determine_address do |host, port| resolve_host(host) do |ip_address| if ip_address.nil? reconnect! else reconnect(ip_address, port.to_i) end end end end end
Send a Redis command once the Redis connection has been established (EM Deferable succeeded).
@param command [String] @param [Array<Object>] *arguments @yield command reponse callback
# File lib/sensu/redis/client.rb, line 171 def redis_command(command, *arguments, &block) if @deferred_status == :succeeded send_command(command, *arguments, &block) else callback do send_command(command, *arguments, &block) end end end
Send a Redis command and queue the associated response callback. This method calls `send_command_data()` for RESP multi bulk and transmission.
@param command [String] @param [Array<Object>] *arguments @yield command reponse callback
# File lib/sensu/redis/client.rb, line 160 def send_command(command, *arguments, &block) send_command_data(command, *arguments) @response_callbacks << [RESPONSE_PROCESSORS[command], block] end
Send a Redis command using RESP multi bulk. This method sends data to Redis using EM connection `send_data()`.
@param [Array<Object>] *arguments
# File lib/sensu/redis/client.rb, line 144 def send_command_data(*arguments) data = "*#{arguments.length}#{DELIM}" arguments.each do |value| value = value.to_s data << "$#{value.bytesize}#{DELIM}#{value}#{DELIM}" end send_data(data) end
Subscribe to a Redis PubSub channel.
@param channel [String] @yield channel message callback.
# File lib/sensu/redis/client.rb, line 195 def subscribe(channel, &block) @pubsub_callbacks ||= {} @pubsub_callbacks[channel] ||= [] @pubsub_callbacks[channel] << block redis_command(SUBSCRIBE_COMMAND, channel, &block) end
This method is called by EM when the connection closes, either intentionally or unexpectedly. This method is reponsible for starting the reconnect process when appropriate.
# File lib/sensu/redis/client.rb, line 125 def unbind @deferred_status = nil @pubsub_callbacks = nil if @closing @reconnecting = false elsif ((@connected || @reconnecting) && @auto_reconnect) || @reconnect_on_error reconnect! elsif @connected error(ConnectionError, "connection closed") else error(ConnectionError, "unable to connect to redis server") end @connected = false end
Unsubscribe to one or more Redis PubSub channels. If a channel is provided, this method will unsubscribe from it. If a channel is not provided, this method will unsubscribe from all Redis PubSub channels.
@param channel [String] @yield unsubscribe callback.
# File lib/sensu/redis/client.rb, line 209 def unsubscribe(channel=nil, &block) @pubsub_callbacks ||= {} arguments = [UNSUBSCRIBE_COMMAND] if channel @pubsub_callbacks[channel] = [block] arguments << channel else @pubsub_callbacks.each_key do |key| @pubsub_callbacks[key] = [block] end end redis_command(arguments) end
Verify the version of Redis. Redis >= 2.0 RC 1 is required for certain Redis commands that Sensu uses. A connection error is created if the Redis version does not meet the requirements.
@yield the callback called once verified.
# File lib/sensu/redis/client.rb, line 256 def verify_version send_command(INFO_COMMAND) do |redis_info| if redis_info[:redis_version] < "1.3.14" error(ConnectionError, "redis version must be >= 2.0 RC 1") else yield if block_given? end end end