class Mysql::Protocol
MySQL network protocol
Constants
- MAX_PACKET_LENGTH
- SSL_MODE_KEY
- VERSION
Attributes
Public Class Methods
Source
# File lib/mysql/protocol.rb, line 25 def self.net2value(pkt, type, unsigned) case type when Field::TYPE_STRING, Field::TYPE_VAR_STRING, Field::TYPE_BLOB, Field::TYPE_JSON, Field::TYPE_GEOMETRY return pkt.lcs when Field::TYPE_NEWDECIMAL s = pkt.lcs return s =~ /\./ && s !~ /\.0*\z/ ? BigDecimal(s) : s.to_i when Field::TYPE_TINY v = pkt.utiny return unsigned ? v : v < 128 ? v : v-256 when Field::TYPE_SHORT v = pkt.ushort return unsigned ? v : v < 32768 ? v : v-65536 when Field::TYPE_INT24, Field::TYPE_LONG v = pkt.ulong return unsigned ? v : v < 0x8000_0000 ? v : v-0x10000_0000 when Field::TYPE_LONGLONG n1, n2 = pkt.ulong, pkt.ulong v = (n2 << 32) | n1 return unsigned ? v : v < 0x8000_0000_0000_0000 ? v : v-0x10000_0000_0000_0000 when Field::TYPE_FLOAT return pkt.read(4).unpack1('e') when Field::TYPE_DOUBLE return pkt.read(8).unpack1('E') when Field::TYPE_DATE len = pkt.utiny y, m, d = pkt.read(len).unpack("vCC") t = Date.new(y, m, d) rescue nil return t when Field::TYPE_DATETIME, Field::TYPE_TIMESTAMP len = pkt.utiny y, m, d, h, mi, s, sp = pkt.read(len).unpack("vCCCCCV") return Time.new(y, m, d, h, mi, Rational(s.to_i*1000000+sp.to_i, 1000000)) rescue nil when Field::TYPE_TIME len = pkt.utiny sign, d, h, mi, s, sp = pkt.read(len).unpack("CVCCCV") r = d.to_i*86400 + h.to_i*3600 + mi.to_i*60 + s.to_i + sp.to_f/1000000 r *= -1 if sign != 0 return r when Field::TYPE_YEAR return pkt.ushort when Field::TYPE_BIT return pkt.lcs else raise "not implemented: type=#{type}" end end
Convert netdata to Ruby value @param data [Packet] packet data @param type [Integer] field type @param unsigned [true or false] true if value is unsigned @return [Object] converted value.
Source
# File lib/mysql/protocol.rb, line 166 def initialize(opts) @mutex = Mutex.new @opts = opts @charset = Mysql::Charset.by_name("utf8mb4") @insert_id = 0 @warning_count = 0 @session_track = {} @gc_stmt_queue = [] # stmt id list which GC destroy. set_state :INIT @get_server_public_key = @opts[:get_server_public_key] begin if @opts[:io] @socket = @opts[:io] elsif @opts[:host].nil? or @opts[:host].empty? or @opts[:host] == "localhost" socket = @opts[:socket] || ENV["MYSQL_UNIX_PORT"] || MYSQL_UNIX_PORT @socket = Socket.unix(socket) else port = @opts[:port] || ENV["MYSQL_TCP_PORT"] || (Socket.getservbyname("mysql", "tcp") rescue MYSQL_TCP_PORT) @socket = Socket.tcp(@opts[:host], port, connect_timeout: @opts[:connect_timeout]) end rescue Errno::ETIMEDOUT raise ClientError, "connection timeout" end end
make socket connection to server. @param opts [Hash] @option :host [String] hostname mysqld running @option :username [String] username to connect to mysqld @option :password [String] password to connect to mysqld @option :database [String] initial database name @option :port [String] port number (used if host is not ‘localhost’ or nil) @option :socket [String] socket filename (used if host is ‘localhost’ or nil) @option :flags [Integer] connection flag. Mysql::CLIENT_* ORed @option :charset [Mysql::Charset] character set @option :connect_timeout [Numeric, nil] @option :read_timeout [Numeric, nil] @option :write_timeout [Numeric, nil] @option :local_infile [Boolean] @option :load_data_local_dir [String] @option :ssl_mode [Integer] @option :ssl_context_params [Hash<:Symbol, String>] @option :get_server_public_key [Boolean] @option :io [BasicSocket, OpenSSL::SSL::SSLSocket] Existing socket instance that will be used instead of creating a new socket @raise [ClientError] connection timeout
Source
# File lib/mysql/protocol.rb, line 78 def self.value2net(v) v = v == true ? 1 : v == false ? 0 : v case v when nil type = Field::TYPE_NULL val = "" when Integer if -0x8000_0000 <= v && v < 0x8000_0000 type = Field::TYPE_LONG val = [v].pack('V') elsif -0x8000_0000_0000_0000 <= v && v < 0x8000_0000_0000_0000 type = Field::TYPE_LONGLONG val = [v&0xffffffff, v>>32].pack("VV") elsif 0x8000_0000_0000_0000 <= v && v <= 0xffff_ffff_ffff_ffff type = Field::TYPE_LONGLONG | 0x8000 val = [v&0xffffffff, v>>32].pack("VV") else type =Field::TYPE_NEWDECIMAL val = Packet.lcs(v.to_s) end when BigDecimal type = Field::TYPE_NEWDECIMAL val = Packet.lcs(v.to_s) when Float type = Field::TYPE_DOUBLE val = [v].pack("E") when String type = Field::TYPE_STRING val = Packet.lcs(v) when Time type = Field::TYPE_DATETIME val = [11, v.year, v.month, v.day, v.hour, v.min, v.sec, v.usec].pack("CvCCCCCV") when DateTime type = Field::TYPE_DATETIME val = [11, v.year, v.month, v.day, v.hour, v.min, v.sec, (v.sec_fraction*1000000).to_i].pack("CvCCCCCV") when Date type = Field::TYPE_DATE val = [11, v.year, v.month, v.day, 0, 0, 0, 0].pack("CvCCCCCV") else raise ProtocolError, "class #{v.class} is not supported" end return type, val end
convert Ruby value to netdata @param v [Object] Ruby value. @return [Integer] type of column. Field::TYPE_* @return [String] netdata @raise [ProtocolError] value too large / value is not supported
Public Instance Methods
Source
# File lib/mysql/protocol.rb, line 198 def authenticate synchronize(before: :INIT, after: :READY) do reset init_packet = InitialPacket.parse read @server_info = init_packet.server_version @server_version = init_packet.server_version.split(/\D/)[0, 3].inject{|a, b| a.to_i*100+b.to_i} @server_capabilities = init_packet.server_capabilities @thread_id = init_packet.thread_id @client_flags = CLIENT_LONG_PASSWORD | CLIENT_LONG_FLAG | CLIENT_TRANSACTIONS | CLIENT_PROTOCOL_41 | CLIENT_SECURE_CONNECTION | CLIENT_MULTI_RESULTS | CLIENT_PS_MULTI_RESULTS | CLIENT_PLUGIN_AUTH | CLIENT_CONNECT_ATTRS | CLIENT_SESSION_TRACK | CLIENT_LOCAL_FILES @client_flags |= CLIENT_CONNECT_WITH_DB if @opts[:database] @client_flags |= @opts[:flags] if @opts[:charset] @charset = @opts[:charset].is_a?(Charset) ? @opts[:charset] : Charset.by_name(@opts[:charset]) else @charset = Charset.by_number(init_packet.server_charset) @charset.encoding # raise error if unsupported charset end enable_ssl Authenticator.new(self).authenticate(@opts[:username], @opts[:password].to_s, @opts[:database], init_packet.scramble_buff, init_packet.auth_plugin, @opts[:connect_attrs]) end end
initial negotiate and authenticate. @param charset [Mysql::Charset, nil] charset for connection. nil: use server’s charset @raise [ProtocolError] The old style password is not supported
Source
# File lib/mysql/protocol.rb, line 475 def check_state(st) raise Mysql::ClientError::CommandsOutOfSync, 'command out of sync' unless @state == st end
Source
# File lib/mysql/protocol.rb, line 243 def enable_ssl ssl_mode = SSL_MODE_KEY[@opts[:ssl_mode]] raise ClientError, "ssl_mode #{@opts[:ssl_mode]} is not supported" unless ssl_mode return if ssl_mode == SSL_MODE_DISABLED if ssl_mode == SSL_MODE_PREFERRED return if @socket.local_address.unix? return if @server_capabilities & CLIENT_SSL == 0 end if ssl_mode >= SSL_MODE_REQUIRED && @server_capabilities & CLIENT_SSL == 0 raise ClientError::SslConnectionError, "SSL is required but the server doesn't support it" end context = OpenSSL::SSL::SSLContext.new context.set_params(@opts[:ssl_context_params]) context.verify_mode = OpenSSL::SSL::VERIFY_NONE if ssl_mode < SSL_MODE_VERIFY_CA context.verify_hostname = false if ssl_mode < SSL_MODE_VERIFY_IDENTITY ssl_socket = OpenSSL::SSL::SSLSocket.new(@socket, context) ssl_socket.sync_close = true ssl_socket.hostname = @opts[:host] if ssl_mode >= SSL_MODE_VERIFY_IDENTITY @client_flags |= CLIENT_SSL write Protocol::TlsAuthenticationPacket.serialize(@client_flags, 1024**3, @charset.number) ssl_socket.connect @socket = ssl_socket rescue OpenSSL::SSL::SSLError => e @client_flags &= ~CLIENT_SSL return if @opts[:ssl_mode] < SSL_MODE_REQUIRED raise e end
Source
# File lib/mysql/protocol.rb, line 471 def gc_stmt(stmt_id) @gc_stmt_queue.push stmt_id end
Source
# File lib/mysql/protocol.rb, line 304 def get_result synchronize(before: :WAIT_RESULT, error: :READY) do res_packet = ResultPacket.parse read @field_count = res_packet.field_count if @field_count.to_i > 0 # result data exists set_state :FIELD return @field_count end if @field_count.nil? # LOAD DATA LOCAL INFILE send_local_file(res_packet.message) res_packet = ResultPacket.parse read end @affected_rows, @insert_id, @server_status, @warning_count, @message, @session_track = res_packet.affected_rows, res_packet.insert_id, res_packet.server_status, res_packet.warning_count, res_packet.message, res_packet.session_track set_state :READY unless more_results? return nil end end
get result of query. @return [integer, nil] number of fields of results. nil if no results.
Source
# File lib/mysql/protocol.rb, line 401 def kill_command(pid) simple_command [COM_PROCESS_KILL, pid].pack("CV") end
Kill command
Source
# File lib/mysql/protocol.rb, line 323 def more_results? @server_status & SERVER_MORE_RESULTS_EXISTS != 0 end
Source
# File lib/mysql/protocol.rb, line 396 def ping_command simple_command [COM_PING].pack("C") end
Ping command
Source
# File lib/mysql/protocol.rb, line 295 def query_command(query) synchronize(before: :READY, after: :WAIT_RESULT, error: :READY) do reset write [COM_QUERY, @charset.convert(query)].pack("Ca*") end end
Query command @param query [String] query string
Source
# File lib/mysql/protocol.rb, line 281 def quit_command get_result if @state == :WAIT_RESULT retr_fields if @state == :FIELD retr_all_records(RawRecord) if @state == :RESULT synchronize(before: :READY, after: :CLOSED) do reset write [COM_QUIT].pack("C") close @gc_stmt_queue.clear end end
Quit command
Source
# File lib/mysql/protocol.rb, line 516 def read data = +'' len = nil begin timeout = @state == :INIT ? @opts[:connect_timeout] : @opts[:read_timeout] header = read_timeout(4, timeout) raise EOFError unless header && header.length == 4 len1, len2, seq = header.unpack("CvC") len = (len2 << 8) + len1 raise ProtocolError, "invalid packet: sequence number mismatch(#{seq} != #{@seq}(expected))" if @seq != seq @seq = (@seq + 1) % 256 ret = read_timeout(len, timeout) raise EOFError unless ret && ret.length == len data.concat ret rescue EOFError, OpenSSL::SSL::SSLError close raise ClientError::ServerLost, 'Lost connection to server during query' rescue Errno::ETIMEDOUT raise ClientError, "read timeout" end while len == MAX_PACKET_LENGTH @sqlstate = "00000" # Error packet if data[0] == ?\xff _, errno, marker, @sqlstate, message = data.unpack("Cvaa5a*") unless marker == "#" _, errno, message = data.unpack("Cva*") # Version 4.0 Error @sqlstate = "" end @server_status &= ~SERVER_MORE_RESULTS_EXISTS message.force_encoding(@charset.encoding) if Mysql::ServerError::ERROR_MAP.key? errno raise Mysql::ServerError::ERROR_MAP[errno].new(message, @sqlstate) end raise Mysql::ServerError.new(message, @sqlstate, errno) end Packet.new(data) end
Read one packet data @return [Packet] packet data @rails [ProtocolError] invalid packet sequence number
Source
# File lib/mysql/protocol.rb, line 624 def read_eof_packet pkt = read raise ProtocolError, "packet is not EOF" unless pkt.eof? pkt.utiny # 0xFE _warnings = pkt.ushort @server_status = pkt.ushort end
Read EOF packet @raise [ProtocolError] packet is not EOF
Source
# File lib/mysql/protocol.rb, line 556 def read_timeout(len, timeout) return @socket.read(len) if timeout.nil? || timeout == 0 result = +'' e = Time.now + timeout while result.size < len now = Time.now raise Errno::ETIMEDOUT if now > e r = @socket.read_nonblock(len - result.size, exception: false) case r when :wait_readable IO.select([@socket], nil, nil, e - now) # rubocop:disable Lint/IncompatibleIoSelectWithFiberScheduler next when :wait_writable IO.select(nil, [@socket], nil, e - now) # rubocop:disable Lint/IncompatibleIoSelectWithFiberScheduler next else result << r end end return result end
Source
# File lib/mysql/protocol.rb, line 406 def refresh_command(op) simple_command [COM_REFRESH, op].pack("CC") end
Refresh command
Source
# File lib/mysql/protocol.rb, line 509 def reset @seq = 0 # packet counter. reset by each command end
Reset sequence number
Source
# File lib/mysql/protocol.rb, line 376 def retr_all_records(record_class) synchronize(before: :RESULT) do enc = charset.encoding begin all_recs = [] until (pkt = read).eof? all_recs.push record_class.new(pkt, @fields, enc) end pkt.utiny # 0xFE _warnings = pkt.ushort @server_status = pkt.ushort @no_more_records = true all_recs ensure set_state(more_results? ? :WAIT_RESULT : :READY) end end end
Retrieve all records for simple query or prepared statement @param record_class [RawRecord or StmtRawRecord] @return [Array<record_class>] all records
Source
# File lib/mysql/protocol.rb, line 342 def retr_fields synchronize(before: :FIELD, after: :RESULT, error: :READY) do @fields = @field_count.times.map{Field.new FieldPacket.parse(read)} read_eof_packet @no_more_records = false @fields end end
Retrieve n fields @return [Array<Mysql::Field>] field list
Source
# File lib/mysql/protocol.rb, line 355 def retr_record(record_class) return nil if @no_more_records synchronize(before: :RESULT) do enc = charset.encoding begin unless (pkt = read).eof? return record_class.new(pkt, @fields, enc) end pkt.utiny pkt.ushort @server_status = pkt.ushort set_state(more_results? ? :WAIT_RESULT : :READY) @no_more_records = true return nil end end end
Retrieve one record for simple query or prepared statement @param record_class [RawRecord or StmtRawRecord] @return [<record_class>] record @return [nil] no more record
Source
# File lib/mysql/protocol.rb, line 328 def send_local_file(filename) filename = File.absolute_path(filename) if @opts[:local_infile] || @opts[:load_data_local_dir] && filename.start_with?(@opts[:load_data_local_dir]) File.open(filename){|f| write f} write nil # EOF else write nil # send empty data instead of file contents read # result packet raise ClientError::LoadDataLocalInfileRejected, 'LOAD DATA LOCAL INFILE file request rejected due to restrictions on access.' end end
send local file to server
Source
# File lib/mysql/protocol.rb, line 411 def set_option_command(opt) simple_command [COM_SET_OPTION, opt].pack("Cv") end
Set option command
Source
# File lib/mysql/protocol.rb, line 479 def set_state(st) @state = st return if st != :READY || @gc_stmt_queue.empty? || @socket&.closed? gc_disabled = GC.disable begin while (st = @gc_stmt_queue.shift) reset write [COM_STMT_CLOSE, st].pack("CV") end ensure GC.enable unless gc_disabled end end
Source
# File lib/mysql/protocol.rb, line 416 def shutdown_command(level) simple_command [COM_SHUTDOWN, level].pack("CC") end
Shutdown command
Source
# File lib/mysql/protocol.rb, line 635 def simple_command(packet) synchronize(before: :READY, after: :READY) do reset write packet read.to_s end end
Send simple command
- @param packet
- String
-
packet data
@return [String] received data
Source
# File lib/mysql/protocol.rb, line 276 def ssl_cipher @client_flags.allbits?(CLIENT_SSL) ? @socket.cipher : nil end
Source
# File lib/mysql/protocol.rb, line 421 def statistics_command simple_command [COM_STATISTICS].pack("C") end
Statistics command
Source
# File lib/mysql/protocol.rb, line 460 def stmt_close_command(stmt_id) get_result if @state == :WAIT_RESULT retr_fields if @state == :FIELD retr_all_records(StmtRawRecord) if @state == :RESULT synchronize(before: :READY, after: :READY) do reset write [COM_STMT_CLOSE, stmt_id].pack("CV") @gc_stmt_queue.delete stmt_id end end
Stmt close command @param stmt_id [Integer] statement id
Source
# File lib/mysql/protocol.rb, line 451 def stmt_execute_command(stmt_id, values) synchronize(before: :READY, after: :WAIT_RESULT, error: :READY) do reset write ExecutePacket.serialize(stmt_id, Mysql::Stmt::CURSOR_TYPE_NO_CURSOR, values) end end
Stmt execute command @param stmt_id [Integer] statement id @param values [Array] parameters @return [Integer] number of fields
Source
# File lib/mysql/protocol.rb, line 428 def stmt_prepare_command(stmt) synchronize(before: :READY, after: :READY) do reset write [COM_STMT_PREPARE, charset.convert(stmt)].pack("Ca*") res_packet = PrepareResultPacket.parse read if res_packet.param_count > 0 res_packet.param_count.times{read} # skip parameter packet read_eof_packet end if res_packet.field_count > 0 fields = res_packet.field_count.times.map{Field.new FieldPacket.parse(read)} read_eof_packet else fields = [] end return res_packet.statement_id, res_packet.param_count, fields end end
Stmt prepare command @param stmt [String] prepared statement @return [Array<Integer, Integer, Array<Field>>] statement id, number of parameters, field list
Source
# File lib/mysql/protocol.rb, line 493 def synchronize(before: nil, after: nil, error: nil) @mutex.synchronize do check_state before if before begin return yield rescue set_state error if error raised = true raise ensure set_state after if after && !raised end end end
Source
# File lib/mysql/protocol.rb, line 580 def write(data) timeout = @state == :INIT ? @opts[:connect_timeout] : @opts[:write_timeout] @socket.sync = false if data.nil? write_timeout([0, 0, @seq].pack("CvC"), timeout) @seq = (@seq + 1) % 256 else data = StringIO.new data if data.is_a? String while (d = data.read(MAX_PACKET_LENGTH)) write_timeout([d.length%256, d.length/256, @seq].pack("CvC")+d, timeout) @seq = (@seq + 1) % 256 end end @socket.sync = true @socket.flush rescue Errno::EPIPE, OpenSSL::SSL::SSLError close raise ClientError::ServerGoneError, 'MySQL server has gone away' rescue Errno::ETIMEDOUT raise ClientError, "write timeout" end
Write one packet data @param data [String, IO, nil] packet data. If data is nil, write empty packet.
Source
# File lib/mysql/protocol.rb, line 602 def write_timeout(data, timeout) return @socket.write(data) if timeout.nil? || timeout == 0 len = 0 e = Time.now + timeout while len < data.size now = Time.now raise Errno::ETIMEDOUT if now > e l = @socket.write_nonblock(data[len..], exception: false) case l when :wait_readable IO.select([@socket], nil, nil, e - now) # rubocop:disable Lint/IncompatibleIoSelectWithFiberScheduler when :wait_writable IO.select(nil, [@socket], nil, e - now) # rubocop:disable Lint/IncompatibleIoSelectWithFiberScheduler else len += l end end return len end