summaryrefslogtreecommitdiff
path: root/test/test_buffer.rb
blob: c4fc9f20da3fba6b0a2fbf080ec9dfff80466495 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
# encoding: ASCII-8BIT
# frozen_string_literal: false

require_relative 'common'
require 'net/ssh/buffer'

class TestBuffer < NetSSHTest
  def test_constructor_should_initialize_buffer_to_empty_by_default
    buffer = new
    assert buffer.empty?
    assert_equal 0, buffer.position
  end

  def test_constructor_with_string_should_initialize_buffer_to_the_string
    buffer = new("hello")
    assert !buffer.empty?
    assert_equal "hello", buffer.to_s
    assert_equal 0, buffer.position
  end

  def test_from_should_require_an_even_number_of_arguments
    assert_raises(ArgumentError) { Net::SSH::Buffer.from("this") }
  end

  def test_from_should_build_new_buffer_from_definition
    buffer = Net::SSH::Buffer.from(:byte, 1, :long, 2, :int64, 3, :string, "4", :bool, true, :bool, false, :bignum, OpenSSL::BN.new("1234567890", 10), :raw, "something")
    assert_equal "\1\0\0\0\2\0\0\0\0\0\0\0\3\0\0\0\0014\1\0\000\000\000\004I\226\002\322something", buffer.to_s
  end

  def test_from_should_build_new_buffer_that_includes_utf8_string_128_characters
    length = 128
    # letter A has a 1 byte UTF8 representation
    buffer = Net::SSH::Buffer.from(:long, 2, :string, 'A' * length)
    # long of 2 + length 128 as network endian + 128 A's
    expected = "\x00\x00\x00\x02" + [length].pack('N*') + ("\x41" * length)
    assert_equal expected, buffer.to_s
  end

  def test_from_should_build_new_buffer_with_frozen_strings
    foo = 'foo'.freeze
    buffer = Net::SSH::Buffer.from(:string, foo)
    assert_equal "\0\0\0\3foo", buffer.to_s
  end

  def test_from_should_build_new_buffer_with_utf8_frozen_strings
    foo = "\u2603".freeze
    buffer = Net::SSH::Buffer.from(:string, foo)
    assert_equal "\0\0\0\3\u2603".force_encoding('BINARY'), buffer.to_s
  end

  def test_from_should_not_change_regular_paramaters
    foo = "\u2603"
    buffer = Net::SSH::Buffer.from(:string, foo)
    assert_equal "\0\0\0\3\u2603".force_encoding('BINARY'), buffer.to_s
    assert_equal foo.encoding.to_s, "UTF-8"
  end

  def test_from_with_array_argument_should_write_multiple_of_the_given_type
    buffer = Net::SSH::Buffer.from(:byte, [1, 2, 3, 4, 5])
    assert_equal "\1\2\3\4\5", buffer.to_s
  end

  def test_from_should_measure_bytesize_of_utf_8_string_correctly
    buffer = Net::SSH::Buffer.from(:string, "\u2603") # Snowman is 3 bytes
    assert_equal "\0\0\0\3\u2603".force_encoding('BINARY'), buffer.to_s
  end

  def test_read_without_argument_should_read_to_end
    buffer = new("hello world")
    assert_equal "hello world", buffer.read
    assert buffer.eof?
    assert_equal 11, buffer.position
  end

  def test_read_with_argument_that_is_less_than_length_should_read_that_many_bytes
    buffer = new "hello world"
    assert_equal "hello", buffer.read(5)
    assert_equal 5, buffer.position
  end

  def test_read_with_argument_that_is_more_than_length_should_read_no_more_than_length
    buffer = new "hello world"
    assert_equal "hello world", buffer.read(500)
    assert_equal 11, buffer.position
  end

  def test_read_at_eof_should_return_empty_string
    buffer = new "hello"
    buffer.position = 5
    assert_equal "", buffer.read
  end

  def test_consume_without_argument_should_resize_buffer_to_start_at_position
    buffer = new "hello world"
    buffer.read(5)
    assert_equal 5, buffer.position
    assert_equal 11, buffer.length
    buffer.consume!
    assert_equal 0, buffer.position
    assert_equal 6, buffer.length
    assert_equal " world", buffer.to_s
  end

  def test_consume_with_argument_should_resize_buffer_starting_at_n
    buffer = new "hello world"
    assert_equal 0, buffer.position
    buffer.consume!(5)
    assert_equal 0, buffer.position
    assert_equal 6, buffer.length
    assert_equal " world", buffer.to_s
  end

  def test_read_bang_should_read_and_consume_and_return_read_portion
    buffer = new "hello world"
    assert_equal "hello", buffer.read!(5)
    assert_equal 0, buffer.position
    assert_equal 6, buffer.length
    assert_equal " world", buffer.to_s
  end

  def test_available_should_return_length_after_position_to_end_of_string
    buffer = new "hello world"
    buffer.read(5)
    assert_equal 6, buffer.available
  end

  def test_clear_bang_should_reset_buffer_contents_and_counters
    buffer = new "hello world"
    buffer.read(5)
    buffer.clear!
    assert_equal 0, buffer.length
    assert_equal 0, buffer.position
    assert_equal "", buffer.to_s
  end

  def test_append_should_append_argument_without_changing_position_and_should_return_self
    buffer = new "hello world"
    buffer.read(5)
    buffer.append(" again")
    assert_equal 5, buffer.position
    assert_equal 12, buffer.available
    assert_equal 17, buffer.length
    assert_equal "hello world again", buffer.to_s
  end

  def test_remainder_as_buffer_should_return_a_new_buffer_filled_with_the_text_after_the_current_position
    buffer = new "hello world"
    buffer.read(6)
    b2 = buffer.remainder_as_buffer
    assert_equal 6, buffer.position
    assert_equal 0, b2.position
    assert_equal "world", b2.to_s
  end

  def test_read_int64_should_return_8_byte_integer
    buffer = new "\xff\xee\xdd\xcc\xbb\xaa\x99\x88"
    assert_equal 0xffeeddccbbaa9988, buffer.read_int64
    assert_equal 8, buffer.position
  end

  def test_read_int64_should_return_nil_on_partial_read
    buffer = new "\0\0\0\0\0\0\0"
    assert_nil buffer.read_int64
    assert buffer.eof?
  end

  def test_read_long_should_return_4_byte_integer
    buffer = new "\xff\xee\xdd\xcc\xbb\xaa\x99\x88"
    assert_equal 0xffeeddcc, buffer.read_long
    assert_equal 4, buffer.position
  end

  def test_read_long_should_return_nil_on_partial_read
    buffer = new "\0\0\0"
    assert_nil buffer.read_long
    assert buffer.eof?
  end

  def test_read_byte_should_return_single_byte_integer
    buffer = new "\xfe\xdc"
    assert_equal 0xfe, buffer.read_byte
    assert_equal 1, buffer.position
  end

  def test_read_byte_should_return_nil_at_eof
    assert_nil new.read_byte
  end

  def test_read_string_should_read_length_and_data_from_buffer
    buffer = new "\0\0\0\x0bhello world"
    assert_equal "hello world", buffer.read_string
  end

  def test_read_string_should_return_nil_if_4_byte_length_cannot_be_read
    assert_nil new("\0\1").read_string
  end

  def test_read_bool_should_return_true_if_non_zero_byte_is_read
    buffer = new "\1\2\3\4\5\6"
    6.times { assert_equal true, buffer.read_bool }
  end

  def test_read_bool_should_return_false_if_zero_byte_is_read
    buffer = new "\0"
    assert_equal false, buffer.read_bool
  end

  def test_read_bool_should_return_nil_at_eof
    assert_nil new.read_bool
  end

  def test_read_bignum_should_read_openssl_formatted_bignum
    buffer = new("\000\000\000\004I\226\002\322")
    assert_equal OpenSSL::BN.new("1234567890", 10), buffer.read_bignum
  end

  def test_read_bignum_should_return_nil_if_length_cannot_be_read
    assert_nil new("\0\1\2").read_bignum
  end

  def test_read_key_blob_should_read_dsa_keys
    random_dss { |buffer| buffer.read_keyblob("ssh-dss") }
  end

  def test_read_key_blob_should_read_rsa_keys
    random_rsa { |buffer| buffer.read_keyblob("ssh-rsa") }
  end

  def test_read_key_should_read_dsa_key_type_and_keyblob
    random_dss do |buffer|
      b2 = Net::SSH::Buffer.from(:string, "ssh-dss", :raw, buffer)
      b2.read_key
    end
  end

  def test_read_key_should_read_rsa_key_type_and_keyblob
    random_rsa do |buffer|
      b2 = Net::SSH::Buffer.from(:string, "ssh-rsa", :raw, buffer)
      b2.read_key
    end
  end

  def test_read_buffer_should_read_a_string_and_return_it_wrapped_in_a_buffer
    buffer = new("\0\0\0\x0bhello world")
    b2 = buffer.read_buffer
    assert_equal 0, b2.position
    assert_equal 11, b2.length
    assert_equal "hello world", b2.read
  end

  def test_read_to_should_return_nil_if_pattern_does_not_exist_in_buffer
    buffer = new("one two three")
    assert_nil buffer.read_to("\n")
  end

  def test_read_to_should_grok_string_patterns
    buffer = new("one two three")
    assert_equal "one tw", buffer.read_to("tw")
    assert_equal 6, buffer.position
  end

  def test_read_to_should_grok_regex_patterns
    buffer = new("one two three")
    assert_equal "one tw", buffer.read_to(/tw/)
    assert_equal 6, buffer.position
  end

  def test_read_to_should_grok_fixnum_patterns
    buffer = new("one two three")
    assert_equal "one tw", buffer.read_to(?w)
    assert_equal 6, buffer.position
  end

  def test_reset_bang_should_reset_position_to_0
    buffer = new("hello world")
    buffer.read(5)
    assert_equal 5, buffer.position
    buffer.reset!
    assert_equal 0, buffer.position
  end

  def test_write_should_write_arguments_directly_to_end_buffer
    buffer = new("start")
    buffer.write "hello", " ", "world"
    assert_equal "starthello world", buffer.to_s
    assert_equal 0, buffer.position
  end

  def test_write_int64_should_write_arguments_as_8_byte_integers_to_end_of_buffer
    buffer = new("start")
    buffer.write_int64 0xffeeddccbbaa9988, 0x7766554433221100
    assert_equal "start\xff\xee\xdd\xcc\xbb\xaa\x99\x88\x77\x66\x55\x44\x33\x22\x11\x00", buffer.to_s
  end

  def test_write_long_should_write_arguments_as_4_byte_integers_to_end_of_buffer
    buffer = new("start")
    buffer.write_long 0xffeeddcc, 0xbbaa9988
    assert_equal "start\xff\xee\xdd\xcc\xbb\xaa\x99\x88", buffer.to_s
  end

  def test_write_byte_should_write_arguments_as_1_byte_integers_to_end_of_buffer
    buffer = new("start")
    buffer.write_byte 1, 2, 3, 4, 5
    assert_equal "start\1\2\3\4\5", buffer.to_s
  end

  def test_write_bool_should_write_arguments_as_1_byte_boolean_values_to_end_of_buffer
    buffer = new("start")
    buffer.write_bool nil, false, true, 1, Object.new
    assert_equal "start\0\0\1\1\1", buffer.to_s
  end

  def test_write_bignum_should_write_arguments_as_ssh_formatted_bignum_values_to_end_of_buffer
    buffer = new("start")
    buffer.write_bignum OpenSSL::BN.new('1234567890', 10)
    assert_equal "start\000\000\000\004I\226\002\322", buffer.to_s
  end

  def test_write_dss_key_should_write_argument_to_end_of_buffer
    buffer = new("start")

    key = OpenSSL::PKey::DSA.new
    if key.respond_to?(:set_pqg)
      key.set_pqg(0xffeeddccbbaa9988, 0x7766554433221100, 0xffddbb9977553311)
      key.set_key(0xeeccaa8866442200, nil)
    else
      key.p = 0xffeeddccbbaa9988
      key.q = 0x7766554433221100
      key.g = 0xffddbb9977553311
      key.pub_key = 0xeeccaa8866442200
    end

    buffer.write_key(key)
    assert_equal "start\0\0\0\7ssh-dss\0\0\0\011\0\xff\xee\xdd\xcc\xbb\xaa\x99\x88\0\0\0\010\x77\x66\x55\x44\x33\x22\x11\x00\0\0\0\011\0\xff\xdd\xbb\x99\x77\x55\x33\x11\0\0\0\011\0\xee\xcc\xaa\x88\x66\x44\x22\x00", buffer.to_s
  end

  def test_write_rsa_key_should_write_argument_to_end_of_buffer
    buffer = new("start")

    key = OpenSSL::PKey::RSA.new
    if key.respond_to?(:set_key)
      key.set_key(0x7766554433221100, 0xffeeddccbbaa9988, nil)
    else
      key.e = 0xffeeddccbbaa9988
      key.n = 0x7766554433221100
    end

    buffer.write_key(key)
    assert_equal "start\0\0\0\7ssh-rsa\0\0\0\011\0\xff\xee\xdd\xcc\xbb\xaa\x99\x88\0\0\0\010\x77\x66\x55\x44\x33\x22\x11\x00", buffer.to_s
  end

  def test_read_key_blob_should_read_ecdsa_sha2_nistp256_keys
    random_ecdsa_sha2_nistp256 do |buffer|
      buffer.read_keyblob('ecdsa-sha2-nistp256')
    end
  end

  def test_read_key_blob_should_read_ecdsa_sha2_nistp384_keys
    random_ecdsa_sha2_nistp384 do |buffer|
      buffer.read_keyblob('ecdsa-sha2-nistp384')
    end
  end

  def test_read_key_blob_should_read_ecdsa_sha2_nistp521_keys
    random_ecdsa_sha2_nistp521 do |buffer|
      buffer.read_keyblob('ecdsa-sha2-nistp521')
    end
  end

  def test_read_key_should_read_ecdsa_sha2_nistp256_key_type_and_keyblob
    random_ecdsa_sha2_nistp256 do |buffer|
      b2 = Net::SSH::Buffer.from(:string, 'ecdsa-sha2-nistp256', :raw, buffer)
      b2.read_key
    end
  end

  def test_read_key_should_read_ecdsa_sha2_nistp384_key_type_and_keyblob
    random_ecdsa_sha2_nistp384 do |buffer|
      b2 = Net::SSH::Buffer.from(:string, 'ecdsa-sha2-nistp384', :raw, buffer)
      b2.read_key
    end
  end

  def test_read_key_should_read_ecdsa_sha2_nistp521_key_type_and_keyblob
    random_ecdsa_sha2_nistp521 do |buffer|
      b2 = Net::SSH::Buffer.from(:string, 'ecdsa-sha2-nistp521', :raw, buffer)
      b2.read_key
    end
  end

  def test_write_ecdsa_sha2_nistp256_key_should_write_argument_to_end_of_buffer
    buffer = new('start')
    key = OpenSSL::PKey::EC.new("-----BEGIN EC PRIVATE KEY-----\nMHcCAQEEIISGj5vAJCWt2KPI8NwaWVDSNLl2vbRxDIOkY+n6O0VVoAoGCCqGSM49\nAwEHoUQDQgAEnKbs0yEogTKT4QRu8T9nb2svl2mEWXb6g224oCpD2o6TYNXNw54H\nmWkdCv+kFCqSlfSi5fqFhrXdfEY6zSzQYQ==\n-----END EC PRIVATE KEY-----\n")

    buffer.write_key(key)
    assert_equal "start\000\000\000\023ecdsa-sha2-nistp256\000\000\000\bnistp256\000\000\000A\004\234\246\354\323!(\2012\223\341\004n\361?gok/\227i\204Yv\372\203m\270\240*C\332\216\223`\325\315\303\236\a\231i\035\n\377\244\024*\222\225\364\242\345\372\205\206\265\335|F:\315,\320a", buffer.to_s
  end

  def test_write_ecdsa_sha2_nistp384_key_should_write_argument_to_end_of_buffer
    buffer = new('start')
    key = OpenSSL::PKey::EC.new("-----BEGIN EC PRIVATE KEY-----\nMIGkAgEBBDBAfxJpzhsR7O+wMol6BcDgualR8rJBvYegUDYbBUrDnPzDx2/gD1lZ\nnwG1FuD2s9igBwYFK4EEACKhZANiAATsfiU4Kxyvvj1DdvFYsdDnZIT7loRlan9I\n8geCWPPl6x7NFRP+awrnTaarMgieGqxG8IQaIA0SsDOICfbDBkuatRi0S1Et/in4\nZwVEZvO81Ro5YSrjuUDAsytnI6OXS28=\n-----END EC PRIVATE KEY-----\n")

    buffer.write_key(key)
    assert_equal "start\000\000\000\023ecdsa-sha2-nistp384\000\000\000\bnistp384\000\000\000a\004\354~%8+\034\257\276=Cv\361X\261\320\347d\204\373\226\204ej\177H\362\a\202X\363\345\353\036\315\025\023\376k\n\347M\246\2532\b\236\032\254F\360\204\032 \r\022\2603\210\t\366\303\006K\232\265\030\264KQ-\376)\370g\005Df\363\274\325\0329a*\343\271@\300\263+g#\243\227Ko", buffer.to_s
  end

  def test_write_ecdsa_sha2_nistp521_key_should_write_argument_to_end_of_buffer
    buffer = new('start')
    key = OpenSSL::PKey::EC.new("-----BEGIN EC PRIVATE KEY-----\nMIHbAgEBBEGhnQF/SFo4Vym88HnCfc6BR8WwYqDh9wNTPeqzR8auxIpp0GKQlCG2\nuHzyteJX5/YalV8empYhEzNmNLNn8x7j0aAHBgUrgQQAI6GBiQOBhgAEAYygOgV9\nVI8UyLQ3BDlv+rb3es+ufrIcj++cqcc9QcmRn237NiWRr/1NKy2AKijsEdACtZXo\nxPC0x9Vs9ieC2oR+ANOBubcxPl2giDnBYm8ywAmmlXsP5ByAM17k97CzW5O+Z/uO\nbxGUzzhoXTNcjqpAckhRVKdnh6FL/rKelT0tBYi+\n-----END EC PRIVATE KEY-----\n")

    buffer.write_key(key)
    assert_equal "start\000\000\000\023ecdsa-sha2-nistp521\000\000\000\bnistp521\000\000\000\205\004\001\214\240:\005}T\217\024\310\2647\0049o\372\266\367z\317\256~\262\034\217\357\234\251\307=A\311\221\237m\3736%\221\257\375M+-\200*(\354\021\320\002\265\225\350\304\360\264\307\325l\366'\202\332\204~\000\323\201\271\2671>]\240\2109\301bo2\300\t\246\225{\017\344\034\2003^\344\367\260\263[\223\276g\373\216o\021\224\3178h]3\\\216\252@rHQT\247g\207\241K\376\262\236\225=-\005\210\276", buffer.to_s
  end

  def test_write_ec_point_should_write_argument_to_end_of_buffer
    buffer = new('start')
    key = OpenSSL::PKey::EC::Point.new(OpenSSL::PKey::EC.new(OpenSSL::PKey::EC::CurveNameAlias['nistp256']).group)

    buffer.write_key(key)
    assert_equal "start\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00\bnistp256\x00\x00\x00\x00", buffer.to_s
  end

  private

  def random_rsa
    n1 = OpenSSL::BN.new(rand(0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF).to_s, 10)
    n2 = OpenSSL::BN.new(rand(0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF).to_s, 10)
    buffer = Net::SSH::Buffer.from(:bignum, [n1, n2])
    key = yield(buffer)
    assert_equal "ssh-rsa", key.ssh_type
    assert_equal n1, key.e
    assert_equal n2, key.n
  end

  def random_dss
    n1 = OpenSSL::BN.new(rand(0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF).to_s, 10)
    n2 = OpenSSL::BN.new(rand(0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF).to_s, 10)
    n3 = OpenSSL::BN.new(rand(0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF).to_s, 10)
    n4 = OpenSSL::BN.new(rand(0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF).to_s, 10)
    buffer = Net::SSH::Buffer.from(:bignum, [n1, n2, n3, n4])
    key = yield(buffer)
    assert_equal "ssh-dss", key.ssh_type
    assert_equal n1, key.p
    assert_equal n2, key.q
    assert_equal n3, key.g
    assert_equal n4, key.pub_key
  end

  def random_ecdsa_sha2_nistp256
    k = OpenSSL::PKey::EC.new('prime256v1').generate_key
    buffer = Net::SSH::Buffer.from(:string, 'nistp256',
                                   :string, k.public_key.to_bn.to_s(2))
    key = yield(buffer)
    assert_equal 'ecdsa-sha2-nistp256', key.ssh_type
    assert_equal k.public_key, key.public_key
  end

  def random_ecdsa_sha2_nistp384
    k = OpenSSL::PKey::EC.new('secp384r1').generate_key
    buffer = Net::SSH::Buffer.from(:string, 'nistp384',
                                   :string, k.public_key.to_bn.to_s(2))
    key = yield(buffer)
    assert_equal 'ecdsa-sha2-nistp384', key.ssh_type
    assert_equal k.public_key, key.public_key
  end

  def random_ecdsa_sha2_nistp521
    k = OpenSSL::PKey::EC.new('secp521r1').generate_key
    buffer = Net::SSH::Buffer.from(:string, 'nistp521',
                                   :string, k.public_key.to_bn.to_s(2))
    key = yield(buffer)
    assert_equal 'ecdsa-sha2-nistp521', key.ssh_type
    assert_equal k.public_key, key.public_key
  end

  def new(*args)
    Net::SSH::Buffer.new(*args)
  end
end