# This file is part of cloud-init. See LICENSE file for license information. import os import stat from functools import partial from textwrap import dedent from typing import NamedTuple from unittest import mock from unittest.mock import patch import pytest from cloudinit import ssh_util, util M_PATH = "cloudinit.ssh_util." class FakePwEnt(NamedTuple): pw_name: str = "UNSET_pw_name" pw_passwd: str = "UNSET_w_passwd" pw_uid: str = "UNSET_pw_uid" pw_gid: str = "UNSET_pw_gid" pw_gecos: str = "UNSET_pw_gecos" pw_dir: str = "UNSET_pw_dir" pw_shell: str = "UNSET_pw_shell" def mock_get_owner(updated_permissions, value): try: return updated_permissions[value][0] except ValueError: return util.get_owner(value) def mock_get_group(updated_permissions, value): try: return updated_permissions[value][1] except ValueError: return util.get_group(value) def mock_get_user_groups(username): return username def mock_get_permissions(updated_permissions, value): try: return updated_permissions[value][2] except ValueError: return util.get_permissions(value) def mock_getpwnam(users, username): return users[username] # Do not use these public keys, most of them are fetched from # the testdata for OpenSSH, and their private keys are available # https://github.com/openssh/openssh-portable/tree/master/regress/unittests/sshkey/testdata VALID_CONTENT = { "dsa": ( "AAAAB3NzaC1kc3MAAACBAIrjOQSlSea19bExXBMBKBvcLhBoVvNBjCppNzllipF" "W4jgIOMcNanULRrZGjkOKat6MWJNetSbV1E6IOFDQ16rQgsh/OvYU9XhzM8seLa" "A21VszZuhIV7/2DE3vxu7B54zVzueG1O1Deq6goQCRGWBUnqO2yluJiG4HzrnDa" "jzRAAAAFQDMPO96qXd4F5A+5b2f2MO7SpVomQAAAIBpC3K2zIbDLqBBs1fn7rsv" "KcJvwihdlVjG7UXsDB76P2GNqVG+IlYPpJZ8TO/B/fzTMtrdXp9pSm9OY1+BgN4" "REsZ2WNcvfgY33aWaEM+ieCcQigvxrNAF2FTVcbUIIxAn6SmHuQSWrLSfdHc8H7" "hsrgeUPPdzjBD/cv2ZmqwZ1AAAAIAplIsScrJut5wJMgyK1JG0Kbw9JYQpLe95P" "obB069g8+mYR8U0fysmTEdR44mMu0VNU5E5OhTYoTGfXrVrkR134LqFM2zpVVbE" "JNDnIqDHxTkc6LY2vu8Y2pQ3/bVnllZZOda2oD5HQ7ovygQa6CH+fbaZHbdDUX/" "5z7u2rVAlDw==" ), "ecdsa": ( "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBITrGBB3cgJ" "J7fPxvtMW9H3oRisNpJ3OAslxZeyP7I0A9BPAW0RQIwHVtVnM7zrp4nI+JLZov/" "Ql7lc2leWL7CY=" ), "rsa": ( "AAAAB3NzaC1yc2EAAAABIwAAAQEA3I7VUf2l5gSn5uavROsc5HRDpZdQueUq5oz" "emNSj8T7enqKHOEaFoU2VoPgGEWC9RyzSQVeyD6s7APMcE82EtmW4skVEgEGSbD" "c1pvxzxtchBj78hJP6Cf5TCMFSXw+Fz5rF1dR23QDbN1mkHs7adr8GW4kSWqU7Q" "7NDwfIrJJtO7Hi42GyXtvEONHbiRPOe8stqUly7MvUoN+5kfjBM8Qqpfl2+FNhT" "YWpMfYdPUnE7u536WqzFmsaqJctz3gBxH9Ex7dFtrxR4qiqEr9Qtlu3xGn7Bw07" "/+i1D+ey3ONkZLN+LQ714cgj8fRS4Hj29SCmXp5Kt5/82cD/VN3NtHw==" ), "ed25519": ( "AAAAC3NzaC1lZDI1NTE5AAAAIA1J77+CrJ8p6/vWCEzuylqJNMHUP/XmeYyGVWb8lnDd" ), "ecdsa-sha2-nistp256-cert-v01@openssh.com": ( "AAAAKGVjZHNhLXNoYTItbmlzdHAyNTYtY2VydC12MDFAb3BlbnNzaC5jb20AAAA" "gQIfwT/+UX68/hlKsdKuaOuAVB6ftTg03SlP/uH4OBEwAAAAIbmlzdHAyNTYAAA" "BBBEjA0gjJmPM6La3sXyfNlnjilvvGY6I2M8SvJj4o3X/46wcUbPWTaj4RF3EXw" "HvNxplYBwdPlk2zEecvf9Cs2BMAAAAAAAAAAAAAAAEAAAAYa2V5cy9lY2RzYS1z" "aGEyLW5pc3RwMjU2AAAAAAAAAAAAAAAA//////////8AAAAAAAAAggAAABVwZXJ" "taXQtWDExLWZvcndhcmRpbmcAAAAAAAAAF3Blcm1pdC1hZ2VudC1mb3J3YXJkaW" "5nAAAAAAAAABZwZXJtaXQtcG9ydC1mb3J3YXJkaW5nAAAAAAAAAApwZXJtaXQtc" "HR5AAAAAAAAAA5wZXJtaXQtdXNlci1yYwAAAAAAAAAAAAAAaAAAABNlY2RzYS1z" "aGEyLW5pc3RwMjU2AAAACG5pc3RwMjU2AAAAQQRH6Y9Q1+ocQ8ETKW3LjQqtxg7" "OuSSDacxmmQatQVaIawwjCbmntyEAqmVj3v9ElDSXnO5m7TyYMBQu4+vsh76RAA" "AAZQAAABNlY2RzYS1zaGEyLW5pc3RwMjU2AAAASgAAACEA47Cl2MMhr+glPGuxx" "2tM3QXkDcwdP0SxSEW5yy4XV5oAAAAhANNMm1cdVlAt3hmycQgdD82zPlg5YvVO" "iN0SQTbgVD8i" ), "ecdsa-sha2-nistp256": ( "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBEjA0gjJmPM" "6La3sXyfNlnjilvvGY6I2M8SvJj4o3X/46wcUbPWTaj4RF3EXwHvNxplYBwdPlk" "2zEecvf9Cs2BM=" ), "ecdsa-sha2-nistp384-cert-v01@openssh.com": ( "AAAAKGVjZHNhLXNoYTItbmlzdHAzODQtY2VydC12MDFAb3BlbnNzaC5jb20AAAA" "grnSvDsK1EnCZndO1IyGWcGkVgVSkPWi/XO2ybPFyLVUAAAAIbmlzdHAzODQAAA" "BhBAaYSQs+8TT0Tzciy0dorwhur6yzOGUrYQ6ueUQYWbE7eNdHmhsVrlpGPgSaY" "ByhXtAJiPOMqLU5h0eb3sCtM3ek4NvjXFTGTqPrrxJI6q0OsgrtkGE7UM9ZsfMm" "7q6BOAAAAAAAAAAAAAAAAQAAABhrZXlzL2VjZHNhLXNoYTItbmlzdHAzODQAAAA" "AAAAAAAAAAAD//////////wAAAAAAAACCAAAAFXBlcm1pdC1YMTEtZm9yd2FyZG" "luZwAAAAAAAAAXcGVybWl0LWFnZW50LWZvcndhcmRpbmcAAAAAAAAAFnBlcm1pd" "C1wb3J0LWZvcndhcmRpbmcAAAAAAAAACnBlcm1pdC1wdHkAAAAAAAAADnBlcm1p" "dC11c2VyLXJjAAAAAAAAAAAAAACIAAAAE2VjZHNhLXNoYTItbmlzdHAzODQAAAA" "IbmlzdHAzODQAAABhBLWbubcMzcWc7lMTCMGVXZlaVvUOHLjpr6SOOScFFrd8K9" "Gl8nYELST5HZ1gym65m+MG6/tbrUWIY/flLWNIe+WtqxrdPPGdIhFruCwNw2peZ" "SbQOa/o3AGnJ/vO6EKEGAAAAIQAAAATZWNkc2Etc2hhMi1uaXN0cDM4NAAAAGkA" "AAAxAL10JHd5bvnbpD+fet/k1YE1BEIrqGXaoIIJ9ReE5H4nTK1uQJzMD7+wwGK" "RVYqYQgAAADAiit0UCMDAUbjD+R2x4LvU3x/t8G3sdqDLRNfMRpjZpvcS8AwC+Y" "VFVSQNn0AyzW0=" ), "ecdsa-sha2-nistp384": ( "AAAAE2VjZHNhLXNoYTItbmlzdHAzODQAAAAIbmlzdHAzODQAAABhBAaYSQs+8TT" "0Tzciy0dorwhur6yzOGUrYQ6ueUQYWbE7eNdHmhsVrlpGPgSaYByhXtAJiPOMqL" "U5h0eb3sCtM3ek4NvjXFTGTqPrrxJI6q0OsgrtkGE7UM9ZsfMm7q6BOA==" ), "ecdsa-sha2-nistp521-cert-v01@openssh.com": ( "AAAAKGVjZHNhLXNoYTItbmlzdHA1MjEtY2VydC12MDFAb3BlbnNzaC5jb20AAAA" "gGmRzkkMvRFk1V5U3m3mQ2nfW20SJVXk1NKnT5iZGDcEAAAAIbmlzdHA1MjEAAA" "CFBAHosAOHAI1ZkerbKYQ72S6uit1u77PCj/OalZtXgsxv0TTAZB273puG2X94C" "Q8yyNHcby87zFZHdv5BSKyZ/cyREAAeiAcSakop9VS3+bUfZpEIqwBZXarwUjnR" "nxprkcQ0rfCCdagkGZr/OA7DemK2D8tKLTHsKoEEWNImo6/pXDkFxAAAAAAAAAA" "AAAAAAQAAABhrZXlzL2VjZHNhLXNoYTItbmlzdHA1MjEAAAAAAAAAAAAAAAD///" "///////wAAAAAAAACCAAAAFXBlcm1pdC1YMTEtZm9yd2FyZGluZwAAAAAAAAAXc" "GVybWl0LWFnZW50LWZvcndhcmRpbmcAAAAAAAAAFnBlcm1pdC1wb3J0LWZvcndh" "cmRpbmcAAAAAAAAACnBlcm1pdC1wdHkAAAAAAAAADnBlcm1pdC11c2VyLXJjAAA" "AAAAAAAAAAACsAAAAE2VjZHNhLXNoYTItbmlzdHA1MjEAAAAIbmlzdHA1MjEAAA" "CFBAC6hFVXM1XEg/7qKkp5sLZuANGQVW88b5pPn2ZcK0td9IQstLH6BwWuZ6MPE" "ogiDlvx9HD1BaKGBBfkxgOY8NGFzQHbjU9eTWH3gt0RATDbZsij1pSkFPnAXdU9" "SjfogYloI2xdHaTCgWp3zgsUV+BBQ0QGGv2MqqcOmrF0f5YEJeOffAAAAKcAAAA" "TZWNkc2Etc2hhMi1uaXN0cDUyMQAAAIwAAABCAT+vSOYPuYVTDopDW08576d5Sb" "edXQMOu1op4CQIm98VKtAXvu5dfioi5VYAqpte8M+UxEMOMiQWJp+U9exYf6LuA" "AAAQgEzkIpX3yKXPaPcK17mNx40ujEDitm4ARmbhAge0sFhZtf7YIgI55b6vkI8" "JvMJkzQCBF1cpNOaIpVh1nFZNBphMQ==" ), "ecdsa-sha2-nistp521": ( "AAAAE2VjZHNhLXNoYTItbmlzdHA1MjEAAAAIbmlzdHA1MjEAAACFBAHosAOHAI1" "ZkerbKYQ72S6uit1u77PCj/OalZtXgsxv0TTAZB273puG2X94CQ8yyNHcby87zF" "ZHdv5BSKyZ/cyREAAeiAcSakop9VS3+bUfZpEIqwBZXarwUjnRnxprkcQ0rfCCd" "agkGZr/OA7DemK2D8tKLTHsKoEEWNImo6/pXDkFxA==" ), "sk-ecdsa-sha2-nistp256-cert-v01@openssh.com": ( "AAAAIHNzaC1lZDI1NTE5LWNlcnQtdjAxQG9wZW5zc2guY29tAAAAIIxzuxl4z3u" "wAIslne8Huft+1n1IhHAlNbWZkQyyECCGAAAAIFOG6kY7Rf4UtCFvPwKgo/BztX" "ck2xC4a2WyA34XtIwZAAAAAAAAAAgAAAACAAAABmp1bGl1cwAAABIAAAAFaG9zd" "DEAAAAFaG9zdDIAAAAANowB8AAAAABNHmBwAAAAAAAAAAAAAAAAAAAAMwAAAAtz" "c2gtZWQyNTUxOQAAACBThupGO0X+FLQhbz8CoKPwc7V3JNsQuGtlsgN+F7SMGQA" "AAFMAAAALc3NoLWVkMjU1MTkAAABABGTn+Bmz86Ajk+iqKCSdP5NClsYzn4alJd" "0V5bizhP0Kumc/HbqQfSt684J1WdSzih+EjvnTgBhK9jTBKb90AQ==" ), "sk-ecdsa-sha2-nistp256@openssh.com": ( "AAAAInNrLWVjZHNhLXNoYTItbmlzdHAyNTZAb3BlbnNzaC5jb20AAAAIbmlzdHA" "yNTYAAABBBIELQJ2DgvaX1yQlKFokfWM2suuaCFI2qp0eJodHyg6O4ifxc3XpRK" "d1OS8dNYQtE/YjdXSrA+AOnMF5ns2Nkx4AAAAEc3NoOg==" ), "sk-ssh-ed25519-cert-v01@openssh.com": ( "AAAAIHNzaC1lZDI1NTE5LWNlcnQtdjAxQG9wZW5zc2guY29tAAAAIIxzuxl4z3u" "wAIslne8Huft+1n1IhHAlNbWZkQyyECCGAAAAIFOG6kY7Rf4UtCFvPwKgo/BztX" "ck2xC4a2WyA34XtIwZAAAAAAAAAAgAAAACAAAABmp1bGl1cwAAABIAAAAFaG9zd" "DEAAAAFaG9zdDIAAAAANowB8AAAAABNHmBwAAAAAAAAAAAAAAAAAAAAMwAAAAtz" "c2gtZWQyNTUxOQAAACBThupGO0X+FLQhbz8CoKPwc7V3JNsQuGtlsgN+F7SMGQA" "AAFMAAAALc3NoLWVkMjU1MTkAAABABGTn+Bmz86Ajk+iqKCSdP5NClsYzn4alJd" "0V5bizhP0Kumc/HbqQfSt684J1WdSzih+EjvnTgBhK9jTBKb90AQ==" ), "sk-ssh-ed25519@openssh.com": ( "AAAAGnNrLXNzaC1lZDI1NTE5QG9wZW5zc2guY29tAAAAICFo/k5LU8863u66YC9" "eUO2170QduohPURkQnbLa/dczAAAABHNzaDo=" ), "ssh-dss-cert-v01@openssh.com": ( "AAAAHHNzaC1kc3MtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgdTlbNU9Hn9Qng3F" "HxwH971bxCIoq1ern/QWFFDWXgmYAAACBAPqS600VGwdPAQC/p3f0uGyrLVql0c" "Fn1zYd/JGvtabKnIYjLaYprje/NcjwI3CZFJiz4Dp3S8kLs+X5/1DMn/Tg1Y4D4" "yLB+6vCtHcJF7rVBFhvw/KZwc7G54ez3khyOtsg82fzpyOc8/mq+/+C5TMKO7DD" "jMF0k5emWKCsa3ZfAAAAFQCjA/+dKkMu4/CWjJPtfl7YNaStNQAAAIEA7uX1BVV" "tJKjLmWrpw62+l/xSXA5rr7MHBuWjiCYV3VHBfXJaQDyRDtGuEJKDwdzqYgacpG" "ApGWL/cuBtJ9nShsUl6GRG0Ra03g+Hx9VR5LviJBsjAVB4qVgciU1NGga0Bt2Le" "cd1X4EGQRBzVXeuOpiqGM6jP/I2yDMs0Pboet0AAACBAOdXpyfmobEBaOqZAuvg" "j1P0uhjG2P31Ufurv22FWPBU3A9qrkxbOXwE0LwvjCvrsQV/lrYhJz/tiys40Ve" "ahulWZE5SAHMXGIf95LiLSgaXMjko7joot+LK84ltLymwZ4QMnYjnZSSclf1Uuy" "QMcUtb34+I0u9Ycnyhp2mSFsQtAAAAAAAAAAYAAAACAAAABmp1bGl1cwAAABIAA" "AAFaG9zdDEAAAAFaG9zdDIAAAAANowB8AAAAABNHmBwAAAAAAAAAAAAAAAAAAAA" "MwAAAAtzc2gtZWQyNTUxOQAAACBThupGO0X+FLQhbz8CoKPwc7V3JNsQuGtlsgN" "+F7SMGQAAAFMAAAALc3NoLWVkMjU1MTkAAABAh/z1LIdNL1b66tQ8t9DY9BTB3B" "QKpTKmc7ezyFKLwl96yaIniZwD9Ticdbe/8i/Li3uCFE3EAt8NAIv9zff8Bg==" ), "ssh-dss": ( "AAAAB3NzaC1kc3MAAACBAPqS600VGwdPAQC/p3f0uGyrLVql0cFn1zYd/JGvtab" "KnIYjLaYprje/NcjwI3CZFJiz4Dp3S8kLs+X5/1DMn/Tg1Y4D4yLB+6vCtHcJF7" "rVBFhvw/KZwc7G54ez3khyOtsg82fzpyOc8/mq+/+C5TMKO7DDjMF0k5emWKCsa" "3ZfAAAAFQCjA/+dKkMu4/CWjJPtfl7YNaStNQAAAIEA7uX1BVVtJKjLmWrpw62+" "l/xSXA5rr7MHBuWjiCYV3VHBfXJaQDyRDtGuEJKDwdzqYgacpGApGWL/cuBtJ9n" "ShsUl6GRG0Ra03g+Hx9VR5LviJBsjAVB4qVgciU1NGga0Bt2Lecd1X4EGQRBzVX" "euOpiqGM6jP/I2yDMs0Pboet0AAACBAOdXpyfmobEBaOqZAuvgj1P0uhjG2P31U" "furv22FWPBU3A9qrkxbOXwE0LwvjCvrsQV/lrYhJz/tiys40VeahulWZE5SAHMX" "GIf95LiLSgaXMjko7joot+LK84ltLymwZ4QMnYjnZSSclf1UuyQMcUtb34+I0u9" "Ycnyhp2mSFsQt" ), "ssh-ed25519-cert-v01@openssh.com": ( "AAAAIHNzaC1lZDI1NTE5LWNlcnQtdjAxQG9wZW5zc2guY29tAAAAIIxzuxl4z3u" "wAIslne8Huft+1n1IhHAlNbWZkQyyECCGAAAAIFOG6kY7Rf4UtCFvPwKgo/BztX" "ck2xC4a2WyA34XtIwZAAAAAAAAAAgAAAACAAAABmp1bGl1cwAAABIAAAAFaG9zd" "DEAAAAFaG9zdDIAAAAANowB8AAAAABNHmBwAAAAAAAAAAAAAAAAAAAAMwAAAAtz" "c2gtZWQyNTUxOQAAACBThupGO0X+FLQhbz8CoKPwc7V3JNsQuGtlsgN+F7SMGQA" "AAFMAAAALc3NoLWVkMjU1MTkAAABABGTn+Bmz86Ajk+iqKCSdP5NClsYzn4alJd" "0V5bizhP0Kumc/HbqQfSt684J1WdSzih+EjvnTgBhK9jTBKb90AQ==" ), "ssh-ed25519": ( "AAAAC3NzaC1lZDI1NTE5AAAAIFOG6kY7Rf4UtCFvPwKgo/BztXck2xC4a2WyA34XtIwZ" ), "ssh-rsa-cert-v01@openssh.com": ( "AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb20AAAAg98LhS2EHxLOWCLo" "pZPwHdg/RJXusnkOqQXSc9R7aITkAAAADAQABAAAAgQDLV5lUTt7FrADseB/CGh" "EZzpoojjEW5y8+ePvLppmK3MmMI18ud6vxzpK3bwZLYkVSyfJYI0HmIuGhdu7yM" "rW6wb84gbq8C31Xoe9EORcIUuGSvDKdNSM1SjlhDquRblDFB8kToqXyx1lqrXec" "XylxIUOL0jE+u0rU1967pDJx+wAAAAAAAAAFAAAAAgAAAAZqdWxpdXMAAAASAAA" "ABWhvc3QxAAAABWhvc3QyAAAAADaMAfAAAAAATR5gcAAAAAAAAAAAAAAAAAAAAD" "MAAAALc3NoLWVkMjU1MTkAAAAgU4bqRjtF/hS0IW8/AqCj8HO1dyTbELhrZbIDf" "he0jBkAAABTAAAAC3NzaC1lZDI1NTE5AAAAQI3QGlUCzC07KorupxpDkkGy6tni" "aZ8EvBflzvv+itXWNchGvfUeHmVT6aX0sRqehdz/lR+GmXRoZBhofwh0qAM=" ), "ssh-rsa": ( "AAAAB3NzaC1yc2EAAAADAQABAAAAgQDLV5lUTt7FrADseB/CGhEZzpoojjEW5y8" "+ePvLppmK3MmMI18ud6vxzpK3bwZLYkVSyfJYI0HmIuGhdu7yMrW6wb84gbq8C3" "1Xoe9EORcIUuGSvDKdNSM1SjlhDquRblDFB8kToqXyx1lqrXecXylxIUOL0jE+u" "0rU1967pDJx+w==" ), "ssh-xmss-cert-v01@openssh.com": ( "AAAAHXNzaC14bXNzLWNlcnQtdjAxQG9wZW5zc2guY29tAAAAIM2UD0IH+Igsekq" "xjTO5f36exX4WGRMCtDGPjwfbXblxAAAAFVhNU1NfU0hBMi0yNTZfVzE2X0gxMA" "AAAEDI83/K5JMOy0BMJgQypRdz35ApAnoQinMJ8ZMoZPaEJF8Z4rANQlfzaAXum" "N3RDU5CGIUGGw+WJ904G/wwEq9CAAAAAAAAAAAAAAABAAAACWtleXMveG1zcwAA" "AAAAAAAAAAAAAP//////////AAAAAAAAAIIAAAAVcGVybWl0LVgxMS1mb3J3YXJ" "kaW5nAAAAAAAAABdwZXJtaXQtYWdlbnQtZm9yd2FyZGluZwAAAAAAAAAWcGVybW" "l0LXBvcnQtZm9yd2FyZGluZwAAAAAAAAAKcGVybWl0LXB0eQAAAAAAAAAOcGVyb" "Wl0LXVzZXItcmMAAAAAAAAAAAAAAHUAAAAUc3NoLXhtc3NAb3BlbnNzaC5jb20A" "AAAVWE1TU19TSEEyLTI1Nl9XMTZfSDEwAAAAQA+irIyT2kaOd07YWZT/QItzNBZ" "kUYwnqZJihQ7BxuyiDP4HEFbnfYnnIZXx9Asyi7vDyZRvi+AMSOzmMSq4JnkAAA" "ngAAAAFHNzaC14bXNzQG9wZW5zc2guY29tAAAJxAAAAAAFjaKTDc+7Hu2uFGIab" "3NAku8HbbGtrq/uGXOxmqxu4RaLqmwofl5iXk3nMwWEhQAb99vAc9D9ZFtfxJO4" "STYUTjbj4BxToov/uvbYfE5VeO6sMvkGglgh9YHkCTAItsG8EmGT1SIPfKYzLlN" "jvUlbcv0PaPFMJ0wzS9mNfuRf+KUhf3dxQ6zaMrBH3KEJ8Me2kNjhnh6rNPROeI" "N+IcStSKsydYuiySGKS/orsH38XysuK5QqLizbHJY3cqLbkW9LsIijb+pfEJh4Y" "bOoAbraWAv9ySnWCyRhvw2x8uJ0ZM+p5WSRiZfB3JxCpOhHgiKa9TdmdjnAtnED" "zqKOj/gM7y9mesn5ydQI0bENOGymlw0ThUGKbXMxn87Hc9dDPURUBmoO3NGjPDf" "7meS39A1ZEGtCe/pbZU9iwxqGx4wJYvB4lutRP2tYC1pA6hjQCcHibvxl5iqj+1" "jRjwPr8dbTm4PdETW/7JDSVQXKjxOT0kRLHLelJNeviGx5zSHR5PtnUP3nOBMme" "hk9DwcQW9vfKeWSnu9CMnF8xvYJxoPKQwmz0TKo+YVOUnc9/Ma+Ykseof9/W+rk" "USQGELc4x7XE5XBKYZZP2PmtxirQ3qTWFw+CeTX2Oa+jPYkzOa7jgmHJ3Fi9Xqw" "3L844vRl97e28GmwS0M1SXH+ohES0mO4EcrGh5OLyXBaRTV5QMo+4Bg6FH/HwEn" "gG1mdEOAqvctK2QC70c4lHGzfexqwQ2U6WUADPcd/BLOE8Noj1EiXYwZrSA1okZ" "FYnS/b89Uo51D2FE4A33V4gcxAglGzVNtrPulkguNT9B4jjNgdIwkTBL9k3ujkG" "og6pyYjZ0J5Jp5XPBn+y0LqrpOdZijzrc1OJbX59tTeIbDkM7Fw8As4a03hQPDU" "FTOdyMHgLnuLhLXOcqIjvW5axZL/Kx3UET8wrSHizPoa6NErCG4v5mC2M4kBSOW" "In1QV27QMaHkL/ZAa3mPsW5iFZtOVEGzw2BW4MZs0qOrcloCENZzOHiMBroKEkH" "AbzX6D1FLwml2JpXq4JXlCrdIiFm4+co5ygnWPqb4QGzMlcbjW/x/A16TthNuok" "wwlmK5ndKZ76LahyGKEwx2Nv0D+0xilEC1EldtiYRdBNlcGbU/A5EhH5bQ9KVIH" "wjWm35pRPLl5224//nqvQKhwFCn9otsR35XHXev3IQ0or3HmQxIvSDOwir1l66z" "FFrkyHMWexoucbTBxw1MN3hLb247lcVYJ5+hspJgyoYbfR5RkQVDzhpzskogP7l" "K5t0bphu+f+hpvrca7DAiiIZkcR4R1UUQoRnJPRXyXOxlxwS10b51cP9p9jzvZj" "d2LUs8yx1KXWSxNHo6WmtYONNaUfdX2OB5+QCvPULfLfFeBrqpX6Yp5wQMM5Cup" "k8FEfV07eEgQkVE9nDGKHglWo3kUdOF+XCqWAnXn0b/2bNS9/SSAz6gB1GTFcN/" "QsFGlC0QgbCJbQ7LQM6hilRWupWvN5zZ/+HJyyRHuSs5VnQnKiGbIa6AIhx7mP7" "8T82gKjU3mHLJWMGKcT3cY8R958Gs+w4OT71VJRMw3kK6qk02WCbD5OtbFeC6ib" "KRJKdLK3BzjVs/Fzu3mHVucVby3jpvG1Z8HKspKFhvV7gjFEPu8qHKi4MdAlif/" "KakyPk8yZB/dMfaxh7Kv/WpJuSwWNs7RNh29e+ZG+POxqRPWiHqiVw7P17a4dN7" "nkVOawdBEyxI4NAY+4zW+0r0bAy6zNBitBvkq3IXfr3De6Upex52sPHvK04PXoV" "RI6gjnpPSbLLjpSpcHPKgB7DWefLfhd63BUQbc57D8zm8Jd6qtmzcSKn+wz5/zT" "0I6v9I4a+DOjjyqpPpzzNU76pt+Y8SuBgHzMm1vcAdNWlbQrqtScvm0T9AkYni6" "47vSh77uwRZKDtMCMSU151tVUavXhtLYLZ6/ll5NhMXkkx8//i7pk1OBjN5LHVQ" "0QeimRmavlXU1dJ2rwsFAV+9dDdJXUNOq3VLTo9FrbOzZiWtzzjkJpVJAFREnBn" "yIDBK5AXtXE1RzfzaBHzbI2e2kO3t+CSNLWYMFYHBDqaeICYQ9+I9aO/8hnzVSo" "fp+8IfWO8iJhppqynUniicW2oCzrn4oczzYNEjImt8CGY7g90GxWfX+ZgXMJfy/" "bQiFQL3dZvVypDHEbFoIGz+sxkL83xrP4MZV1V9Wwa64lDXYv01Kp4kQXmmnAZY" "KlxBoWqYDXLeLLguSOZxDSCIDpd+YPm39wQ3wOysHW2fmsWtp6FPPlQRUYjsGIP" "lfrkJzpoeaPKDtF1m+mOULfEh9kvTKCmKRi385T9ON39D97eWqaM4CCfUGImvdR" "DlZLXvjmaAh5BVJ8VJxk75OkP14vWFFlTMv0/k4BYLDKsrNqCREC/G9nQBGcD2D" "CLwC2zPNaX2Y9dnyDs2csjN1ibsYttUMnXMgBcnCOkIkVS496Bpc0jQMf35GUgb" "PSyliwqCoXjEBP/2eyq0VLFKQ0fXGsHWvElT+Y/7RYNTiYVWttFMxN5H/2EGcgn" "lfNHLpQvXH9u/3YminS9GX30hQ7jFhpHXxkK8gZ1mpHL9K3pfKS3lG6EF9wQ23O" "qS8m995SG3dp3MzmywxXen/ukXx6bDiEl5VaOvdRUcbhr5Eb3exVDfdWiaJdTYF" "WfIfJOWx88drB3J9vFwjmuaoNEOjFsoNAMYthYOxXraXaJblvmUKz6tJ3T8/G7x" "B9QGYNBsOqBolKoKHBtsWCosLdWhEZr9VFFh2AJrOW1fx24CIkHnvfTtwYORvQq" "Ckuq2bZS1EOdsFkU/X5gwPl6gSUTNhV3IooXkBFL3iBEbfZ6JpQHVVyIuNWjIyN" "b2liCn9Nn0VHeNMMRLl7uyw4eKlOX2ogom8SLvihYxcJoqlCwtehpLsKsU4iwME" "PmDteW5GBGf4GbnqPFkpIT5ed1jGhdZt/dpsp+v6QhYH1uX4pPxdkdnuc84/yb9" "k4SQdKBJ+l3KZkfIxApNWOZqicJfz/eWwS/15hiamRKRuiiUV2zS1V+l8bV7g9O" "gy5scPBMONxtfFlGEKikZKurFmzboCOGQKRBEUCpsY44IAp443h59pQdVIb0YAS" "kfp2xKHwYij6ELRNdH5MrlFa3bNTskGO4k5XDR4cl/Sma2SXgBKb5XjTtlNmCQG" "Gv6lOW7pGXNhs5wfd8K9Ukm6KeLTIlYn1iiKM37YQpa+4JQYljCYhumbqNCkPTZ" "rNYClh8fQEQ8XuOCDpomMWu58YOTfbZNMDWs/Ou7RfCjX+VNwjPShDK9joMwWKc" "Jy3QalZbaoWtcyyvXxR2sqhVR9F7Cmasq4=" ), "ssh-xmss@openssh.com": ( "AAAAFHNzaC14bXNzQG9wZW5zc2guY29tAAAAFVhNU1NfU0hBMi0yNTZfVzE2X0g" "xMAAAAECqptWnK94d+Sj2xcdTu8gz+75lawZoLSZFqC5IhbYuT/Z3oBZCim6yt+" "HAmk6MKldl3Fg+74v4sR/SII0I0Jv/" ), } KEY_TYPES = list(VALID_CONTENT.keys()) TEST_OPTIONS = ( "no-port-forwarding,no-agent-forwarding,no-X11-forwarding," 'command="echo \'Please login as the user "ubuntu" rather than the' 'user "root".\';echo;sleep 10"' ) class TestAuthKeyLineParser: @pytest.mark.parametrize("with_options", [True, False]) @pytest.mark.parametrize("with_comment", [True, False]) @pytest.mark.parametrize("ktype", KEY_TYPES) def test_parse(self, ktype, with_comment, with_options): content = VALID_CONTENT[ktype] comment = "user-%s@host" % ktype options = TEST_OPTIONS line_args = [] if with_options: line_args.append(options) line_args.extend( [ ktype, content, ] ) if with_comment: line_args.append(comment) line = " ".join(line_args) key = ssh_util.AuthKeyLineParser().parse(line) assert key.base64 == content assert key.keytype == ktype if with_options: assert key.options == options else: assert key.options is None if with_comment: assert key.comment == comment else: assert key.comment == "" def test_parse_with_options_passed_in(self): # test key line with key type and base64 only parser = ssh_util.AuthKeyLineParser() baseline = " ".join(("rsa", VALID_CONTENT["rsa"], "user@host")) myopts = "no-port-forwarding,no-agent-forwarding" key = parser.parse("allowedopt" + " " + baseline) assert key.options == "allowedopt" key = parser.parse("overridden_opt " + baseline, options=myopts) assert key.options == myopts def test_parse_invalid_keytype(self): parser = ssh_util.AuthKeyLineParser() key = parser.parse(" ".join(["badkeytype", VALID_CONTENT["rsa"]])) assert not key.valid() class TestUpdateAuthorizedKeys: @pytest.mark.parametrize( "new_entries", [ ( [ " ".join(("rsa", VALID_CONTENT["rsa"], "new_comment1")), ] ), pytest.param( [ " ".join(("rsa", VALID_CONTENT["rsa"], "new_comment1")), "xxx-invalid-thing1", "xxx-invalid-blob2", ], id="skip-invalid-entries", ), ], ) def test_new_keys_replace(self, new_entries): """new entries with the same base64 should replace old.""" orig_entries = [ " ".join(("rsa", VALID_CONTENT["rsa"], "orig_comment1")), " ".join(("dsa", VALID_CONTENT["dsa"], "orig_comment2")), ] expected = "\n".join([new_entries[0], orig_entries[1]]) + "\n" parser = ssh_util.AuthKeyLineParser() found = ssh_util.update_authorized_keys( [parser.parse(p) for p in orig_entries], [parser.parse(p) for p in new_entries], ) assert expected == found @mock.patch(M_PATH + "util.load_file") @mock.patch(M_PATH + "os.path.isfile") class TestParseSSHConfig: @pytest.mark.parametrize( "is_file, file_content", [ pytest.param(True, ("",), id="empty-file"), pytest.param(False, IOError, id="not-a-file"), ], ) def test_dummy_file(self, m_is_file, m_load_file, is_file, file_content): m_is_file.return_value = is_file m_load_file.side_effect = file_content ret = ssh_util.parse_ssh_config("notmatter") assert [] == ret @pytest.mark.parametrize( "file_content", [ pytest.param(["# This is a comment"], id="comment_line"), pytest.param( ["# This is a comment", "# This is another comment"], id="two-comment_lines", ), ], ) def test_comment_line(self, m_is_file, m_load_file, file_content): m_is_file.return_value = True m_load_file.return_value = "\n".join(file_content) ret = ssh_util.parse_ssh_config("some real file") assert len(file_content) == len(ret) assert file_content[0] == ret[0].line def test_blank_lines(self, m_is_file, m_load_file): m_is_file.return_value = True lines = ["", "\t", " "] m_load_file.return_value = "\n".join(lines) ret = ssh_util.parse_ssh_config("some real file") assert len(lines) == len(ret) for line in ret: assert "" == line.line @pytest.mark.parametrize( "file_content, expected_key, expected_value", [ pytest.param("foo bar", "foo", "bar", id="lower-case"), pytest.param("Foo Bar", "foo", "Bar", id="upper-case"), pytest.param("foo=bar", "foo", "bar", id="lower-case-with-equals"), pytest.param("Foo=bar", "foo", "bar", id="upper-case-with-equals"), ], ) def test_case_config( self, m_is_file, m_load_file, file_content, expected_key, expected_value, ): m_is_file.return_value = True m_load_file.return_value = file_content ret = ssh_util.parse_ssh_config("some real file") assert 1 == len(ret) assert expected_key == ret[0].key assert expected_value == ret[0].value def test_duplicated_keys(self, m_is_file, m_load_file): file_content = [ "HostCertificate /data/ssh/ssh_host_rsa_cert", "HostCertificate /data/ssh/ssh_host_ed25519_cert", ] m_is_file.return_value = True m_load_file.return_value = "\n".join(file_content) ret = ssh_util.parse_ssh_config("some real file") assert len(file_content) == len(ret) for i in range(len(file_content)): assert file_content[i] == ret[i].line class TestUpdateSshConfigLines: """Test the update_ssh_config_lines method.""" exlines = [ "#PasswordAuthentication yes", "UsePAM yes", "# Comment line", "AcceptEnv LANG LC_*", "X11Forwarding no", ] pwauth = "PasswordAuthentication" def check_line(self, line, opt, val): assert line.key == opt.lower() assert line.value == val assert opt in str(line) assert val in str(line) @pytest.mark.parametrize( "key, value", [ pytest.param("MyKey", "MyVal", id="new_option_added"), pytest.param( pwauth, "no", id="commented_out_not_updated_but_appended" ), ], ) def test_update_ssh_config_lines(self, key, value): lines = ssh_util.parse_ssh_config_lines(list(self.exlines)) result = ssh_util.update_ssh_config_lines(lines, {key: value}) assert [key] == result self.check_line(lines[-1], key, value) def test_option_without_value(self): """Implementation only accepts key-value pairs.""" extended_exlines = self.exlines.copy() denyusers_opt = "DenyUsers" extended_exlines.append(denyusers_opt) lines = ssh_util.parse_ssh_config_lines(list(extended_exlines)) assert denyusers_opt not in str(lines) def test_single_option_updated(self): """A single update should have change made and line updated.""" opt, val = ("UsePAM", "no") lines = ssh_util.parse_ssh_config_lines(list(self.exlines)) result = ssh_util.update_ssh_config_lines(lines, {opt: val}) assert [opt] == result self.check_line(lines[1], opt, val) def test_multiple_updates_with_add(self): """Verify multiple updates some added some changed, some not.""" updates = { "UsePAM": "no", "X11Forwarding": "no", "NewOpt": "newval", "AcceptEnv": "LANG ADD LC_*", } lines = ssh_util.parse_ssh_config_lines(list(self.exlines)) result = ssh_util.update_ssh_config_lines(lines, updates) assert set(["UsePAM", "NewOpt", "AcceptEnv"]) == set(result) self.check_line(lines[3], "AcceptEnv", updates["AcceptEnv"]) def test_return_empty_if_no_changes(self): """If there are no changes, then return should be empty list.""" updates = {"UsePAM": "yes"} lines = ssh_util.parse_ssh_config_lines(list(self.exlines)) result = ssh_util.update_ssh_config_lines(lines, updates) assert [] == result assert self.exlines == [str(line) for line in lines] def test_keycase_not_modified(self): """Original case of key should not be changed on update. This behavior is to keep original config as much intact as can be.""" updates = {"usepam": "no"} lines = ssh_util.parse_ssh_config_lines(list(self.exlines)) result = ssh_util.update_ssh_config_lines(lines, updates) assert ["usepam"] == result assert "UsePAM no" == str(lines[1]) class TestUpdateSshConfig: cfgdata = "\n".join(["#Option val", "MyKey ORIG_VAL", ""]) def test_modified(self, tmpdir): mycfg = tmpdir.join("ssh_config_1") util.write_file(mycfg, self.cfgdata) ret = ssh_util.update_ssh_config({"MyKey": "NEW_VAL"}, mycfg) assert True is ret found = util.load_file(mycfg) assert self.cfgdata.replace("ORIG_VAL", "NEW_VAL") == found # assert there is a newline at end of file (LP: #1677205) assert "\n" == found[-1] def test_not_modified(self, tmpdir): mycfg = tmpdir.join("ssh_config_2") util.write_file(mycfg, self.cfgdata) with patch("cloudinit.ssh_util.util.write_file") as m_write_file: ret = ssh_util.update_ssh_config({"MyKey": "ORIG_VAL"}, mycfg) assert False is ret assert self.cfgdata == util.load_file(mycfg) m_write_file.assert_not_called() def test_without_include(self, tmpdir): mycfg = tmpdir.join("sshd_config") cfg = "X Y" util.write_file(mycfg, cfg) assert ssh_util.update_ssh_config({"key": "value"}, mycfg) assert "X Y\nkey value\n" == util.load_file(mycfg) expected_conf_file = f"{mycfg}.d/50-cloud-init.conf" assert not os.path.isfile(expected_conf_file) @pytest.mark.parametrize( "cfg", ["Include {mycfg}.d/*.conf", "Include {mycfg}.d/*.conf # comment"], ) def test_with_include(self, cfg, tmpdir): mycfg = tmpdir.join("sshd_config") util.write_file(mycfg, cfg.format(mycfg=mycfg)) assert ssh_util.update_ssh_config({"key": "value"}, mycfg) expected_conf_file = f"{mycfg}.d/50-cloud-init.conf" assert os.path.isfile(expected_conf_file) assert 0o600 == stat.S_IMODE(os.stat(expected_conf_file).st_mode) assert "key value\n" == util.load_file(expected_conf_file) def test_with_commented_include(self, tmpdir): mycfg = tmpdir.join("sshd_config") cfg = f"# Include {mycfg}.d/*.conf" util.write_file(mycfg, cfg) assert ssh_util.update_ssh_config({"key": "value"}, mycfg) assert f"{cfg}\nkey value\n" == util.load_file(mycfg) expected_conf_file = f"{mycfg}.d/50-cloud-init.conf" assert not os.path.isfile(expected_conf_file) def test_with_other_include(self, tmpdir): mycfg = tmpdir.join("sshd_config") cfg = f"Include other_{mycfg}.d/*.conf" util.write_file(mycfg, cfg) assert ssh_util.update_ssh_config({"key": "value"}, mycfg) assert f"{cfg}\nkey value\n" == util.load_file(mycfg) expected_conf_file = f"{mycfg}.d/50-cloud-init.conf" assert not os.path.isfile(expected_conf_file) assert not os.path.isfile(f"other_{mycfg}.d/50-cloud-init.conf") class TestAppendSshConfig: cfgdata = "\n".join(["#Option val", "MyKey ORIG_VAL", ""]) @mock.patch(M_PATH + "_ensure_cloud_init_ssh_config_file") def test_append_ssh_config(self, m_ensure_cloud_init_config_file, tmpdir): mycfg = tmpdir.join("ssh_config") util.write_file(mycfg, self.cfgdata) m_ensure_cloud_init_config_file.return_value = str(mycfg) ssh_util.append_ssh_config( [("MyKey", "NEW_VAL"), ("MyKey", "NEW_VAL_2")], mycfg ) found = util.load_file(mycfg) expected_cfg = dedent( """\ #Option val MyKey ORIG_VAL MyKey NEW_VAL MyKey NEW_VAL_2 """ ) assert expected_cfg == found # assert there is a newline at end of file (LP: #1677205) assert "\n" == found[-1] class TestBasicAuthorizedKeyParse: @pytest.mark.parametrize( "value, homedir, username, expected_rendered", [ pytest.param( "/opt/%u/keys", "/home/bobby", "bobby", ["/opt/bobby/keys"], id="user", ), pytest.param( "/opt/%u", "/home/bobby", "bobby", ["/opt/bobby"], id="user_file", ), pytest.param( "/opt/%u/%u", "/home/bobby", "bobby", ["/opt/bobby/bobby"], id="user_file_2", ), pytest.param( "/keys/path1 /keys/path2", "/home/bobby", "bobby", ["/keys/path1", "/keys/path2"], id="multiple", ), pytest.param( "/keys/path1 /keys/%u", "/home/bobby", "bobby", ["/keys/path1", "/keys/bobby"], id="multiple_2", ), pytest.param( ".secret/keys", "/home/bobby", "bobby", ["/home/bobby/.secret/keys"], id="relative", ), pytest.param( "%h/.keys", "/homedirs/bobby", "bobby", ["/homedirs/bobby/.keys"], id="home", ), pytest.param( "%h/.keys .secret/keys /keys/path1 /opt/%u/keys", "/homedirs/bobby", "bobby", [ "/homedirs/bobby/.keys", "/homedirs/bobby/.secret/keys", "/keys/path1", "/opt/bobby/keys", ], id="all", ), ], ) def test_render_authorizedkeysfile_paths( self, value, homedir, username, expected_rendered ): assert expected_rendered == ssh_util.render_authorizedkeysfile_paths( value, homedir, username ) class TestMultipleSshAuthorizedKeysFile: def create_fake_users( self, names, mock_permissions, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, users, tmpdir, ): homes = [] root = str(tmpdir.join("root")) fpw = FakePwEnt(pw_name="root", pw_dir=root) users["root"] = fpw for name in names: home = str(tmpdir.join("home", name)) fpw = FakePwEnt(pw_name=name, pw_dir=home) users[name] = fpw homes.append(home) m_get_permissions.side_effect = partial( mock_get_permissions, mock_permissions ) m_get_owner.side_effect = partial(mock_get_owner, mock_permissions) m_get_group.side_effect = partial(mock_get_group, mock_permissions) m_getpwnam.side_effect = partial(mock_getpwnam, users) return homes def create_user_authorized_file(self, home, filename, content_key, keys): user_ssh_folder = os.path.join(home, ".ssh") # /tmp/home//.ssh/authorized_keys = content_key authorized_keys = str(os.path.join(user_ssh_folder, filename)) util.write_file(authorized_keys, VALID_CONTENT[content_key]) keys[authorized_keys] = content_key return authorized_keys def create_global_authorized_file( self, filename, content_key, keys, tmpdir ): authorized_keys = str(tmpdir.join(filename)) util.write_file(authorized_keys, VALID_CONTENT[content_key]) keys[authorized_keys] = content_key return authorized_keys def create_sshd_config(self, authorized_keys_files, tmpdir): sshd_config = str(tmpdir.join("sshd_config")) util.write_file( sshd_config, "AuthorizedKeysFile " + authorized_keys_files ) return sshd_config def execute_and_check(self, user, sshd_config, solution, keys): (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( user, sshd_config ) content = ssh_util.update_authorized_keys(auth_key_entries, []) assert auth_key_fn == solution for path, key in keys.items(): if path == solution: assert VALID_CONTENT[key] in content else: assert VALID_CONTENT[key] not in content @pytest.mark.parametrize("inverted", [False, True]) @patch("cloudinit.ssh_util.pwd.getpwnam") @patch("cloudinit.util.get_permissions") @patch("cloudinit.util.get_owner") @patch("cloudinit.util.get_group") def test_single_user_two_local_files( self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, inverted, tmpdir, ): user_bobby = "bobby" keys = {} users = {} mock_permissions = { tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700), tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700), tmpdir.join("home", "bobby", ".ssh", "user_keys"): ( "bobby", "bobby", 0o600, ), tmpdir.join("home", "bobby", ".ssh", "authorized_keys"): ( "bobby", "bobby", 0o600, ), } homes = self.create_fake_users( [user_bobby], mock_permissions, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, users, tmpdir, ) home = homes[0] # /tmp/home/bobby/.ssh/authorized_keys = rsa authorized_keys = self.create_user_authorized_file( home, "authorized_keys", "rsa", keys ) # /tmp/home/bobby/.ssh/user_keys = dsa user_keys = self.create_user_authorized_file( home, "user_keys", "dsa", keys ) # /tmp/sshd_config if not inverted: options = f"{authorized_keys} {user_keys}" else: options = f"{user_keys} {authorized_keys}" sshd_config = self.create_sshd_config(options, tmpdir) if not inverted: exec_args = (user_bobby, sshd_config, authorized_keys, keys) else: exec_args = (user_bobby, sshd_config, user_keys, keys) self.execute_and_check(*exec_args) @pytest.mark.parametrize("inverted", [False, True]) @patch("cloudinit.ssh_util.pwd.getpwnam") @patch("cloudinit.util.get_permissions") @patch("cloudinit.util.get_owner") @patch("cloudinit.util.get_group") def test_single_user_local_global_files( self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, inverted, tmpdir, ): user_bobby = "bobby" keys = {} users = {} mock_permissions = { tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700), tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700), tmpdir.join("home", "bobby", ".ssh", "user_keys"): ( "bobby", "bobby", 0o600, ), tmpdir.join("home", "bobby", ".ssh", "authorized_keys"): ( "bobby", "bobby", 0o600, ), } homes = self.create_fake_users( [user_bobby], mock_permissions, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, users, tmpdir, ) home = homes[0] # /tmp/home/bobby/.ssh/authorized_keys = rsa authorized_keys = self.create_user_authorized_file( home, "authorized_keys", "rsa", keys ) # /tmp/home/bobby/.ssh/user_keys = dsa user_keys = self.create_user_authorized_file( home, "user_keys", "dsa", keys ) authorized_keys_global = self.create_global_authorized_file( "etc/ssh/authorized_keys", "ecdsa", keys, tmpdir ) if not inverted: options = f"{authorized_keys_global} {user_keys} {authorized_keys}" else: options = f"{authorized_keys_global} {authorized_keys} {user_keys}" sshd_config = self.create_sshd_config(options, tmpdir) if not inverted: exec_args = (user_bobby, sshd_config, user_keys, keys) else: exec_args = (user_bobby, sshd_config, authorized_keys, keys) self.execute_and_check(*exec_args) @patch("cloudinit.ssh_util.pwd.getpwnam") @patch("cloudinit.util.get_permissions") @patch("cloudinit.util.get_owner") @patch("cloudinit.util.get_group") def test_single_user_global_file( self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, tmpdir ): user_bobby = "bobby" keys = {} users = {} mock_permissions = { tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700), tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700), tmpdir.join("home", "bobby", ".ssh", "authorized_keys"): ( "bobby", "bobby", 0o600, ), } homes = self.create_fake_users( [user_bobby], mock_permissions, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, users, tmpdir, ) home = homes[0] # /tmp/etc/ssh/authorized_keys = rsa authorized_keys_global = self.create_global_authorized_file( "etc/ssh/authorized_keys", "rsa", keys, tmpdir ) options = "%s" % authorized_keys_global sshd_config = self.create_sshd_config(options, tmpdir) default = "%s/.ssh/authorized_keys" % home self.execute_and_check(user_bobby, sshd_config, default, keys) @patch("cloudinit.ssh_util.pwd.getpwnam") @patch("cloudinit.util.get_permissions") @patch("cloudinit.util.get_owner") @patch("cloudinit.util.get_group") def test_two_users_local_file_standard( self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, tmpdir ): keys = {} users = {} mock_permissions = { tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700), tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700), tmpdir.join("home", "bobby", ".ssh", "authorized_keys"): ( "bobby", "bobby", 0o600, ), tmpdir.join("home", "suzie"): ("suzie", "suzie", 0o700), tmpdir.join("home", "suzie", ".ssh"): ("suzie", "suzie", 0o700), tmpdir.join("home", "suzie", ".ssh", "authorized_keys"): ( "suzie", "suzie", 0o600, ), } user_bobby = "bobby" user_suzie = "suzie" homes = self.create_fake_users( [user_bobby, user_suzie], mock_permissions, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, users, tmpdir, ) home_bobby = homes[0] home_suzie = homes[1] # /tmp/home/bobby/.ssh/authorized_keys = rsa authorized_keys = self.create_user_authorized_file( home_bobby, "authorized_keys", "rsa", keys ) # /tmp/home/suzie/.ssh/authorized_keys = rsa authorized_keys2 = self.create_user_authorized_file( home_suzie, "authorized_keys", "ssh-xmss@openssh.com", keys ) options = ".ssh/authorized_keys" sshd_config = self.create_sshd_config(options, tmpdir) self.execute_and_check(user_bobby, sshd_config, authorized_keys, keys) self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys) @patch("cloudinit.ssh_util.pwd.getpwnam") @patch("cloudinit.util.get_permissions") @patch("cloudinit.util.get_owner") @patch("cloudinit.util.get_group") def test_two_users_local_file_custom( self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, tmpdir ): keys = {} users = {} mock_permissions = { tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700), tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700), tmpdir.join("home", "bobby", ".ssh", "authorized_keys2"): ( "bobby", "bobby", 0o600, ), tmpdir.join("home", "suzie"): ("suzie", "suzie", 0o700), tmpdir.join("home", "suzie", ".ssh"): ("suzie", "suzie", 0o700), tmpdir.join("home", "suzie", ".ssh", "authorized_keys2"): ( "suzie", "suzie", 0o600, ), } user_bobby = "bobby" user_suzie = "suzie" homes = self.create_fake_users( [user_bobby, user_suzie], mock_permissions, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, users, tmpdir, ) home_bobby = homes[0] home_suzie = homes[1] # /tmp/home/bobby/.ssh/authorized_keys2 = rsa authorized_keys = self.create_user_authorized_file( home_bobby, "authorized_keys2", "rsa", keys ) # /tmp/home/suzie/.ssh/authorized_keys2 = rsa authorized_keys2 = self.create_user_authorized_file( home_suzie, "authorized_keys2", "ssh-xmss@openssh.com", keys ) options = ".ssh/authorized_keys2" sshd_config = self.create_sshd_config(options, tmpdir) self.execute_and_check(user_bobby, sshd_config, authorized_keys, keys) self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys) @patch("cloudinit.ssh_util.pwd.getpwnam") @patch("cloudinit.util.get_permissions") @patch("cloudinit.util.get_owner") @patch("cloudinit.util.get_group") def test_two_users_local_global_files( self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, tmpdir ): keys = {} users = {} mock_permissions = { tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700), tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700), tmpdir.join("home", "bobby", ".ssh", "authorized_keys2"): ( "bobby", "bobby", 0o600, ), tmpdir.join("home", "bobby", ".ssh", "user_keys3"): ( "bobby", "bobby", 0o600, ), tmpdir.join("home", "suzie"): ("suzie", "suzie", 0o700), tmpdir.join("home", "suzie", ".ssh"): ("suzie", "suzie", 0o700), tmpdir.join("home", "suzie", ".ssh", "authorized_keys2"): ( "suzie", "suzie", 0o600, ), tmpdir.join("home", "suzie", ".ssh", "user_keys3"): ( "suzie", "suzie", 0o600, ), } user_bobby = "bobby" user_suzie = "suzie" homes = self.create_fake_users( [user_bobby, user_suzie], mock_permissions, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, users, tmpdir, ) home_bobby = homes[0] home_suzie = homes[1] # /tmp/home/bobby/.ssh/authorized_keys2 = rsa self.create_user_authorized_file( home_bobby, "authorized_keys2", "rsa", keys ) # /tmp/home/bobby/.ssh/user_keys3 = dsa user_keys = self.create_user_authorized_file( home_bobby, "user_keys3", "dsa", keys ) # /tmp/home/suzie/.ssh/authorized_keys2 = rsa authorized_keys2 = self.create_user_authorized_file( home_suzie, "authorized_keys2", "ssh-xmss@openssh.com", keys ) # /tmp/etc/ssh/authorized_keys = ecdsa authorized_keys_global = self.create_global_authorized_file( "etc/ssh/authorized_keys2", "ecdsa", keys, tmpdir ) options = "%s %s %%h/.ssh/authorized_keys2" % ( authorized_keys_global, user_keys, ) sshd_config = self.create_sshd_config(options, tmpdir) self.execute_and_check(user_bobby, sshd_config, user_keys, keys) self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys) @patch("cloudinit.util.get_user_groups") @patch("cloudinit.ssh_util.pwd.getpwnam") @patch("cloudinit.util.get_permissions") @patch("cloudinit.util.get_owner") @patch("cloudinit.util.get_group") def test_two_users_local_global_files_badguy( self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, m_get_user_groups, tmpdir, ): keys = {} users = {} mock_permissions = { tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700), tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700), tmpdir.join("home", "bobby", ".ssh", "authorized_keys2"): ( "bobby", "bobby", 0o600, ), tmpdir.join("home", "bobby", ".ssh", "user_keys3"): ( "bobby", "bobby", 0o600, ), tmpdir.join("home", "badguy"): ("root", "root", 0o755), tmpdir.join("home", "badguy", "home"): ("root", "root", 0o755), tmpdir.join("home", "badguy", "home", "bobby"): ( "root", "root", 0o655, ), } user_bobby = "bobby" user_badguy = "badguy" home_bobby, *_ = self.create_fake_users( [user_bobby, user_badguy], mock_permissions, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, users, tmpdir, ) m_get_user_groups.side_effect = mock_get_user_groups # /tmp/home/bobby/.ssh/authorized_keys2 = rsa authorized_keys = self.create_user_authorized_file( home_bobby, "authorized_keys2", "rsa", keys ) # /tmp/home/bobby/.ssh/user_keys3 = dsa user_keys = self.create_user_authorized_file( home_bobby, "user_keys3", "dsa", keys ) # /tmp/home/badguy/home/bobby = "" authorized_keys2 = str(tmpdir.join("home", "badguy", "home", "bobby")) util.write_file(authorized_keys2, "") # /tmp/etc/ssh/authorized_keys = ecdsa authorized_keys_global = self.create_global_authorized_file( "etc/ssh/authorized_keys2", "ecdsa", keys, tmpdir ) # /tmp/sshd_config options = "%s %%h/.ssh/authorized_keys2 %s %s" % ( authorized_keys2, authorized_keys_global, user_keys, ) sshd_config = self.create_sshd_config(options, tmpdir) self.execute_and_check(user_bobby, sshd_config, authorized_keys, keys) self.execute_and_check( user_badguy, sshd_config, authorized_keys2, keys ) @patch("cloudinit.util.get_user_groups") @patch("cloudinit.ssh_util.pwd.getpwnam") @patch("cloudinit.util.get_permissions") @patch("cloudinit.util.get_owner") @patch("cloudinit.util.get_group") def test_two_users_unaccessible_file( self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, m_get_user_groups, tmpdir, ): keys = {} users = {} mock_permissions = { tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700), tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700), tmpdir.join("home", "bobby", ".ssh", "authorized_keys"): ( "bobby", "bobby", 0o600, ), tmpdir.join("etc"): ("root", "root", 0o755), tmpdir.join("etc", "ssh"): ("root", "root", 0o755), tmpdir.join("etc", "ssh", "userkeys"): ("root", "root", 0o700), tmpdir.join("etc", "ssh", "userkeys", "bobby"): ( "bobby", "bobby", 0o600, ), tmpdir.join("etc", "ssh", "userkeys", "badguy"): ( "badguy", "badguy", 0o600, ), tmpdir.join("home", "badguy"): ("badguy", "badguy", 0o700), tmpdir.join("home", "badguy", ".ssh"): ("badguy", "badguy", 0o700), tmpdir.join("home", "badguy", ".ssh", "authorized_keys"): ( "badguy", "badguy", 0o600, ), } user_bobby = "bobby" user_badguy = "badguy" homes = self.create_fake_users( [user_bobby, user_badguy], mock_permissions, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, users, tmpdir, ) m_get_user_groups.side_effect = mock_get_user_groups home_bobby = homes[0] home_badguy = homes[1] # /tmp/home/bobby/.ssh/authorized_keys = rsa authorized_keys = self.create_user_authorized_file( home_bobby, "authorized_keys", "rsa", keys ) # /tmp/etc/ssh/userkeys/bobby = dsa # assume here that we can bypass userkeys, despite permissions self.create_global_authorized_file( "etc/ssh/userkeys/bobby", "dsa", keys, tmpdir ) # /tmp/home/badguy/.ssh/authorized_keys = ssh-xmss@openssh.com authorized_keys2 = self.create_user_authorized_file( home_badguy, "authorized_keys", "ssh-xmss@openssh.com", keys ) # /tmp/etc/ssh/userkeys/badguy = ecdsa self.create_global_authorized_file( "etc/ssh/userkeys/badguy", "ecdsa", keys, tmpdir ) # /tmp/sshd_config options = str( tmpdir.join("etc", "ssh", "userkeys", "%u .ssh", "authorized_keys") ) sshd_config = self.create_sshd_config(options, tmpdir) self.execute_and_check(user_bobby, sshd_config, authorized_keys, keys) self.execute_and_check( user_badguy, sshd_config, authorized_keys2, keys ) @patch("cloudinit.util.get_user_groups") @patch("cloudinit.ssh_util.pwd.getpwnam") @patch("cloudinit.util.get_permissions") @patch("cloudinit.util.get_owner") @patch("cloudinit.util.get_group") def test_two_users_accessible_file( self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, m_get_user_groups, tmpdir, ): keys = {} users = {} mock_permissions = { tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700), tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700), tmpdir.join("home", "bobby", ".ssh", "authorized_keys"): ( "bobby", "bobby", 0o600, ), tmpdir.join("etc"): ("root", "root", 0o755), tmpdir.join("etc", "ssh"): ("root", "root", 0o755), tmpdir.join("etc", "ssh", "userkeys"): ("root", "root", 0o755), tmpdir.join("etc", "ssh", "userkeys", "bobby"): ( "bobby", "bobby", 0o600, ), tmpdir.join("etc", "ssh", "userkeys", "badguy"): ( "badguy", "badguy", 0o600, ), tmpdir.join("home", "badguy"): ("badguy", "badguy", 0o700), tmpdir.join("home", "badguy", ".ssh"): ("badguy", "badguy", 0o700), tmpdir.join("home", "badguy", ".ssh", "authorized_keys"): ( "badguy", "badguy", 0o600, ), } user_bobby = "bobby" user_badguy = "badguy" homes = self.create_fake_users( [user_bobby, user_badguy], mock_permissions, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, users, tmpdir, ) m_get_user_groups.side_effect = mock_get_user_groups home_bobby = homes[0] home_badguy = homes[1] # /tmp/home/bobby/.ssh/authorized_keys = rsa self.create_user_authorized_file( home_bobby, "authorized_keys", "rsa", keys ) # /tmp/etc/ssh/userkeys/bobby = dsa # assume here that we can bypass userkeys, despite permissions authorized_keys = self.create_global_authorized_file( "etc/ssh/userkeys/bobby", "dsa", keys, tmpdir ) # /tmp/home/badguy/.ssh/authorized_keys = ssh-xmss@openssh.com self.create_user_authorized_file( home_badguy, "authorized_keys", "ssh-xmss@openssh.com", keys ) # /tmp/etc/ssh/userkeys/badguy = ecdsa authorized_keys2 = self.create_global_authorized_file( "etc/ssh/userkeys/badguy", "ecdsa", keys, tmpdir ) # /tmp/sshd_config options = str( tmpdir.join("etc", "ssh", "userkeys", "%u .ssh", "authorized_keys") ) sshd_config = self.create_sshd_config(options, tmpdir) self.execute_and_check(user_bobby, sshd_config, authorized_keys, keys) self.execute_and_check( user_badguy, sshd_config, authorized_keys2, keys ) @pytest.mark.parametrize("inverted", [False, True]) @patch("cloudinit.util.get_user_groups") @patch("cloudinit.ssh_util.pwd.getpwnam") @patch("cloudinit.util.get_permissions") @patch("cloudinit.util.get_owner") @patch("cloudinit.util.get_group") def test_two_users_hardcoded_single_user_file( self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, m_get_user_groups, inverted, tmpdir, ): keys = {} users = {} mock_permissions = { tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700), tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700), tmpdir.join("home", "bobby", ".ssh", "authorized_keys"): ( "bobby", "bobby", 0o600, ), tmpdir.join("home", "suzie"): ("suzie", "suzie", 0o700), tmpdir.join("home", "suzie", ".ssh"): ("suzie", "suzie", 0o700), tmpdir.join("home", "suzie", ".ssh", "authorized_keys"): ( "suzie", "suzie", 0o600, ), } user_bobby = "bobby" user_suzie = "suzie" homes = self.create_fake_users( [user_bobby, user_suzie], mock_permissions, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, users, tmpdir, ) home_bobby = homes[0] home_suzie = homes[1] m_get_user_groups.side_effect = mock_get_user_groups # /tmp/home/bobby/.ssh/authorized_keys = rsa authorized_keys = self.create_user_authorized_file( home_bobby, "authorized_keys", "rsa", keys ) # /tmp/home/suzie/.ssh/authorized_keys = ssh-xmss@openssh.com authorized_keys2 = self.create_user_authorized_file( home_suzie, "authorized_keys", "ssh-xmss@openssh.com", keys ) # /tmp/sshd_config if not inverted: expected_keys = authorized_keys else: expected_keys = authorized_keys2 options = "%s" % (expected_keys) sshd_config = self.create_sshd_config(options, tmpdir) if not inverted: expected_bobby = expected_keys expected_suzie = "%s/.ssh/authorized_keys" % home_suzie else: expected_bobby = "%s/.ssh/authorized_keys" % home_bobby expected_suzie = expected_keys self.execute_and_check(user_bobby, sshd_config, expected_bobby, keys) self.execute_and_check(user_suzie, sshd_config, expected_suzie, keys) @patch("cloudinit.util.get_user_groups") @patch("cloudinit.ssh_util.pwd.getpwnam") @patch("cloudinit.util.get_permissions") @patch("cloudinit.util.get_owner") @patch("cloudinit.util.get_group") def test_two_users_hardcoded_user_files( self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, m_get_user_groups, tmpdir, ): keys = {} users = {} mock_permissions = { tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700), tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700), tmpdir.join("home", "bobby", ".ssh", "authorized_keys"): ( "bobby", "bobby", 0o600, ), tmpdir.join("home", "suzie"): ("suzie", "suzie", 0o700), tmpdir.join("home", "suzie", ".ssh"): ("suzie", "suzie", 0o700), tmpdir.join("home", "suzie", ".ssh", "authorized_keys"): ( "suzie", "suzie", 0o600, ), } user_bobby = "bobby" user_suzie = "suzie" homes = self.create_fake_users( [user_bobby, user_suzie], mock_permissions, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, users, tmpdir, ) home_bobby = homes[0] home_suzie = homes[1] m_get_user_groups.side_effect = mock_get_user_groups # /tmp/home/bobby/.ssh/authorized_keys = rsa authorized_keys = self.create_user_authorized_file( home_bobby, "authorized_keys", "rsa", keys ) # /tmp/home/suzie/.ssh/authorized_keys = ssh-xmss@openssh.com authorized_keys2 = self.create_user_authorized_file( home_suzie, "authorized_keys", "ssh-xmss@openssh.com", keys ) # /tmp/etc/ssh/authorized_keys = ecdsa authorized_keys_global = self.create_global_authorized_file( "etc/ssh/authorized_keys", "ecdsa", keys, tmpdir ) # /tmp/sshd_config options = "%s %s %s" % ( authorized_keys_global, authorized_keys, authorized_keys2, ) sshd_config = self.create_sshd_config(options, tmpdir) self.execute_and_check(user_bobby, sshd_config, authorized_keys, keys) self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys) # vi: ts=4 expandtab