[2] | 1 | import urllib |
---|
| 2 | import galaxy.model |
---|
| 3 | from galaxy.model.orm import * |
---|
| 4 | from galaxy.model.mapping import context as sa_session |
---|
| 5 | from base.twilltestcase import * |
---|
| 6 | |
---|
| 7 | class TestHistory( TwillTestCase ): |
---|
| 8 | |
---|
| 9 | def test_000_history_behavior_between_logout_login( self ): |
---|
| 10 | """Testing history behavior between logout and login""" |
---|
| 11 | self.logout() |
---|
| 12 | self.history_options() |
---|
| 13 | # Create a new, empty history named anonymous |
---|
| 14 | name = 'anonymous' |
---|
| 15 | self.new_history( name=name ) |
---|
| 16 | global anonymous_history |
---|
| 17 | anonymous_history = sa_session.query( galaxy.model.History ) \ |
---|
| 18 | .filter( and_( galaxy.model.History.table.c.deleted==False, |
---|
| 19 | galaxy.model.History.table.c.name==name ) ) \ |
---|
| 20 | .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ |
---|
| 21 | .first() |
---|
| 22 | assert anonymous_history is not None, "Problem retrieving anonymous_history from database" |
---|
| 23 | # Upload a dataset to anonymous_history so it will be set as the current history after login |
---|
| 24 | self.upload_file( '1.bed', dbkey='hg18' ) |
---|
| 25 | self.login( email='test1@bx.psu.edu', username='regular-user1' ) |
---|
| 26 | global regular_user1 |
---|
| 27 | regular_user1 = sa_session.query( galaxy.model.User ) \ |
---|
| 28 | .filter( galaxy.model.User.table.c.email=='test1@bx.psu.edu' ) \ |
---|
| 29 | .first() |
---|
| 30 | assert regular_user1 is not None, 'Problem retrieving user with email "test1@bx.psu.edu" from the database' |
---|
| 31 | # Current history should be anonymous_history |
---|
| 32 | self.check_history_for_string( name ) |
---|
| 33 | self.logout() |
---|
| 34 | # Login as the same user again to ensure anonymous_history is still the current history |
---|
| 35 | self.login( email=regular_user1.email ) |
---|
| 36 | self.check_history_for_string( name ) |
---|
| 37 | self.logout() |
---|
| 38 | self.login( email='test2@bx.psu.edu', username='regular-user2' ) |
---|
| 39 | global regular_user2 |
---|
| 40 | regular_user2 = sa_session.query( galaxy.model.User ) \ |
---|
| 41 | .filter( galaxy.model.User.table.c.email=='test2@bx.psu.edu' ) \ |
---|
| 42 | .first() |
---|
| 43 | assert regular_user2 is not None, 'Problem retrieving user with email "test2@bx.psu.edu" from the database' |
---|
| 44 | self.logout() |
---|
| 45 | self.login( email='test3@bx.psu.edu', username='regular-user3' ) |
---|
| 46 | global regular_user3 |
---|
| 47 | regular_user3 = sa_session.query( galaxy.model.User ) \ |
---|
| 48 | .filter( galaxy.model.User.table.c.email=='test3@bx.psu.edu' ) \ |
---|
| 49 | .first() |
---|
| 50 | assert regular_user3 is not None, 'Problem retrieving user with email "test3@bx.psu.edu" from the database' |
---|
| 51 | self.logout() |
---|
| 52 | self.login( email='test@bx.psu.edu', username='admin-user' ) |
---|
| 53 | global admin_user |
---|
| 54 | admin_user = sa_session.query( galaxy.model.User ) \ |
---|
| 55 | .filter( galaxy.model.User.table.c.email=='test@bx.psu.edu' ) \ |
---|
| 56 | .one() |
---|
| 57 | assert admin_user is not None, 'Problem retrieving user with email "test@bx.psu.edu" from the database' |
---|
| 58 | # Get the admin_user private role for later use |
---|
| 59 | global admin_user_private_role |
---|
| 60 | admin_user_private_role = None |
---|
| 61 | for role in admin_user.all_roles(): |
---|
| 62 | if role.name == admin_user.email and role.description == 'Private Role for %s' % admin_user.email: |
---|
| 63 | admin_user_private_role = role |
---|
| 64 | break |
---|
| 65 | if not admin_user_private_role: |
---|
| 66 | raise AssertionError( "Private role not found for user '%s'" % admin_user.email ) |
---|
| 67 | historyA = sa_session.query( galaxy.model.History ) \ |
---|
| 68 | .filter( and_( galaxy.model.History.table.c.deleted==False, |
---|
| 69 | galaxy.model.History.table.c.user_id==admin_user.id ) ) \ |
---|
| 70 | .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ |
---|
| 71 | .first() |
---|
| 72 | assert historyA is not None, "Problem retrieving historyA from database" |
---|
| 73 | assert not historyA.deleted, "After login, historyA is deleted" |
---|
| 74 | # Make sure the last used history is set for the next session after login |
---|
| 75 | self.logout() |
---|
| 76 | self.login( email=admin_user.email ) |
---|
| 77 | historyB = sa_session.query( galaxy.model.History ) \ |
---|
| 78 | .filter( and_( galaxy.model.History.table.c.deleted==False, |
---|
| 79 | galaxy.model.History.table.c.user_id==admin_user.id ) ) \ |
---|
| 80 | .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ |
---|
| 81 | .first() |
---|
| 82 | assert historyB is not None, "Problem retrieving historyB from database" |
---|
| 83 | assert historyA.id == historyB.id, "After the same user logged out and back in, their last used history was not associated with their new session" |
---|
| 84 | def test_005_deleting_histories( self ): |
---|
| 85 | """Testing deleting histories""" |
---|
| 86 | # Logged in as admin_user |
---|
| 87 | historyB = sa_session.query( galaxy.model.History ) \ |
---|
| 88 | .filter( and_( galaxy.model.History.table.c.deleted==False, |
---|
| 89 | galaxy.model.History.table.c.user_id==admin_user.id ) ) \ |
---|
| 90 | .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ |
---|
| 91 | .first() |
---|
| 92 | assert historyB is not None, "Problem retrieving historyB from database" |
---|
| 93 | self.delete_history( self.security.encode_id( historyB.id ) ) |
---|
| 94 | sa_session.refresh( historyB ) |
---|
| 95 | if not historyB.deleted: |
---|
| 96 | raise AssertionError, "Problem deleting history id %d" % historyB.id |
---|
| 97 | # Since we deleted the current history, make sure the history frame was refreshed |
---|
| 98 | self.check_history_for_string( 'Your history is empty.' ) |
---|
| 99 | # We'll now test deleting a list of histories |
---|
| 100 | # After deleting the current history, a new one should have been created |
---|
| 101 | global history1 |
---|
| 102 | history1 = sa_session.query( galaxy.model.History ) \ |
---|
| 103 | .filter( and_( galaxy.model.History.table.c.deleted==False, |
---|
| 104 | galaxy.model.History.table.c.user_id==admin_user.id ) ) \ |
---|
| 105 | .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ |
---|
| 106 | .first() |
---|
| 107 | assert history1 is not None, "Problem retrieving history1 from database" |
---|
| 108 | self.upload_file( '1.bed', dbkey='hg18' ) |
---|
| 109 | self.new_history( name=urllib.quote( 'history2' ) ) |
---|
| 110 | global history2 |
---|
| 111 | history2 = sa_session.query( galaxy.model.History ) \ |
---|
| 112 | .filter( and_( galaxy.model.History.table.c.deleted==False, |
---|
| 113 | galaxy.model.History.table.c.user_id==admin_user.id ) ) \ |
---|
| 114 | .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ |
---|
| 115 | .first() |
---|
| 116 | assert history2 is not None, "Problem retrieving history2 from database" |
---|
| 117 | self.upload_file( '2.bed', dbkey='hg18' ) |
---|
| 118 | ids = '%s,%s' % ( self.security.encode_id( history1.id ), self.security.encode_id( history2.id ) ) |
---|
| 119 | self.delete_history( ids ) |
---|
| 120 | # Since we deleted the current history, make sure the history frame was refreshed |
---|
| 121 | self.check_history_for_string( 'Your history is empty.' ) |
---|
| 122 | try: |
---|
| 123 | self.view_stored_active_histories( strings_displayed=[ history1.name ] ) |
---|
| 124 | raise AssertionError, "History %s is displayed in the active history list after it was deleted" % history1.name |
---|
| 125 | except: |
---|
| 126 | pass |
---|
| 127 | self.view_stored_deleted_histories( strings_displayed=[ history1.name ] ) |
---|
| 128 | try: |
---|
| 129 | self.view_stored_active_histories( strings_displayed=[ history2.name ] ) |
---|
| 130 | raise AssertionError, "History %s is displayed in the active history list after it was deleted" % history2.name |
---|
| 131 | except: |
---|
| 132 | pass |
---|
| 133 | self.view_stored_deleted_histories( strings_displayed=[ history2.name ] ) |
---|
| 134 | sa_session.refresh( history1 ) |
---|
| 135 | if not history1.deleted: |
---|
| 136 | raise AssertionError, "Problem deleting history id %d" % history1.id |
---|
| 137 | if not history1.default_permissions: |
---|
| 138 | raise AssertionError, "Default permissions were incorrectly deleted from the db for history id %d when it was deleted" % history1.id |
---|
| 139 | sa_session.refresh( history2 ) |
---|
| 140 | if not history2.deleted: |
---|
| 141 | raise AssertionError, "Problem deleting history id %d" % history2.id |
---|
| 142 | if not history2.default_permissions: |
---|
| 143 | raise AssertionError, "Default permissions were incorrectly deleted from the db for history id %d when it was deleted" % history2.id |
---|
| 144 | # Current history is empty |
---|
| 145 | self.history_options( user=True ) |
---|
| 146 | def test_010_history_rename( self ): |
---|
| 147 | """Testing renaming a history""" |
---|
| 148 | # Logged in as admin_user |
---|
| 149 | global history3 |
---|
| 150 | history3 = sa_session.query( galaxy.model.History ) \ |
---|
| 151 | .filter( galaxy.model.History.table.c.deleted==False ) \ |
---|
| 152 | .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ |
---|
| 153 | .first() |
---|
| 154 | assert history3 is not None, "Problem retrieving history3 from database" |
---|
| 155 | if history3.deleted: |
---|
| 156 | raise AssertionError, "History id %d deleted when it should not be" % latest_history.id |
---|
| 157 | self.rename_history( self.security.encode_id( history3.id ), history3.name, new_name=urllib.quote( 'history 3' ) ) |
---|
| 158 | sa_session.refresh( history3 ) |
---|
| 159 | def test_015_history_list( self ): |
---|
| 160 | """Testing viewing previously stored active histories""" |
---|
| 161 | # Logged in as admin_user |
---|
| 162 | self.view_stored_active_histories() |
---|
| 163 | def test_020_share_current_history( self ): |
---|
| 164 | """Testing sharing the current history which contains only public datasets""" |
---|
| 165 | # Logged in as admin_user |
---|
| 166 | # Test sharing an empty history - current history is history3 |
---|
| 167 | self.share_current_history( regular_user1.email, |
---|
| 168 | strings_displayed=[ history3.name ], |
---|
| 169 | strings_displayed_after_submit=[ 'You cannot share an empty history.' ] ) |
---|
| 170 | # Make history3 sharable by adding a dataset |
---|
| 171 | self.upload_file( '1.bed', dbkey='hg18' ) |
---|
| 172 | # Current history is no longer empty |
---|
| 173 | self.history_options( user=True, active_datasets=True, activatable_datasets=True ) |
---|
| 174 | # Test sharing history3 with yourself |
---|
| 175 | self.share_current_history( admin_user.email, |
---|
| 176 | strings_displayed=[ history3.name ], |
---|
| 177 | strings_displayed_after_submit=[ 'You cannot send histories to yourself.' ] ) |
---|
| 178 | # Share history3 with 1 valid user |
---|
| 179 | self.share_current_history( regular_user1.email, |
---|
| 180 | strings_displayed=[ history3.name ] ) |
---|
| 181 | # Check out list of histories to make sure history3 was shared |
---|
| 182 | self.view_stored_active_histories( strings_displayed=[ 'operation=share' ] ) |
---|
| 183 | # Make history3 accessible via link. |
---|
| 184 | self.make_accessible_via_link( self.security.encode_id( history3.id ), |
---|
| 185 | strings_displayed=[ 'Make History Accessible via Link' ], |
---|
| 186 | strings_displayed_after_submit=[ 'Anyone can view and import this history' ] ) |
---|
| 187 | # Make sure history3 is now accessible. |
---|
| 188 | sa_session.refresh( history3 ) |
---|
| 189 | if not history3.importable: |
---|
| 190 | raise AssertionError, "History 3 is not marked as importable after make_accessible_via_link" |
---|
| 191 | # Try importing history3 |
---|
| 192 | self.import_history_via_url( self.security.encode_id( history3.id ), |
---|
| 193 | admin_user.email, |
---|
| 194 | strings_displayed_after_submit=[ 'You cannot import your own history.' ] ) |
---|
| 195 | # Disable access via link for history3. |
---|
| 196 | self.disable_access_via_link( self.security.encode_id( history3.id ), |
---|
| 197 | strings_displayed=[ 'Anyone can view and import this history' ], |
---|
| 198 | strings_displayed_after_submit=[ 'Make History Accessible via Link' ] ) |
---|
| 199 | # Try importing history3 after disabling access via link. To do this, need to login as regular user 2, who cannot access |
---|
| 200 | # history via sharing or via link. |
---|
| 201 | self.logout() |
---|
| 202 | self.login( email=regular_user2.email ) |
---|
| 203 | self.import_history_via_url( self.security.encode_id( history3.id ), |
---|
| 204 | admin_user.email, |
---|
| 205 | strings_displayed_after_submit=[ 'History is not accessible to current user' ] ) |
---|
| 206 | self.logout() |
---|
| 207 | self.login( email=admin_user.email ) |
---|
| 208 | # Test sharing history3 with an invalid user |
---|
| 209 | self.share_current_history( 'jack@jill.com', |
---|
| 210 | strings_displayed_after_submit=[ 'jack@jill.com is not a valid Galaxy user.' ] ) |
---|
| 211 | def test_025_delete_shared_current_history( self ): |
---|
| 212 | """Testing deleting the current history after it was shared""" |
---|
| 213 | # Logged in as admin_user |
---|
| 214 | self.delete_current_history( strings_displayed=[ "History (%s) has been shared with others, unshare it before deleting it." % history3.name ] ) |
---|
| 215 | def test_030_clone_shared_history( self ): |
---|
| 216 | """Testing cloning a shared history""" |
---|
| 217 | # logged in as admin user |
---|
| 218 | self.logout() |
---|
| 219 | self.login( email=regular_user1.email ) |
---|
| 220 | # Shared history3 affects history options |
---|
| 221 | self.history_options( user=True, histories_shared_by_others=True ) |
---|
| 222 | # Shared history3 should be in regular_user1's list of shared histories |
---|
| 223 | self.view_shared_histories( strings_displayed=[ history3.name, admin_user.email ] ) |
---|
| 224 | self.clone_history( self.security.encode_id( history3.id ), |
---|
| 225 | 'activatable', |
---|
| 226 | strings_displayed_after_submit=[ 'is now included in your previously stored histories.' ] ) |
---|
| 227 | global history3_clone1 |
---|
| 228 | history3_clone1 = sa_session.query( galaxy.model.History ) \ |
---|
| 229 | .filter( and_( galaxy.model.History.table.c.deleted==False, |
---|
| 230 | galaxy.model.History.table.c.user_id==regular_user1.id ) ) \ |
---|
| 231 | .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ |
---|
| 232 | .first() |
---|
| 233 | assert history3_clone1 is not None, "Problem retrieving history3_clone1 from database" |
---|
| 234 | # Check list of histories to make sure shared history3 was cloned |
---|
| 235 | strings_displayed=[ "Clone of '%s' shared by '%s'" % ( history3.name, admin_user.email ) ] |
---|
| 236 | self.view_stored_active_histories( strings_displayed=strings_displayed ) |
---|
| 237 | def test_035_clone_current_history( self ): |
---|
| 238 | """Testing cloning the current history""" |
---|
| 239 | # logged in as regular_user1 |
---|
| 240 | self.logout() |
---|
| 241 | self.login( email=admin_user.email ) |
---|
| 242 | # Current history should be history3, add more datasets to history3, then delete them so we can |
---|
| 243 | # test cloning activatable datasets as well as only the active datasets |
---|
| 244 | self.upload_file( '2.bed', dbkey='hg18' ) |
---|
| 245 | hda_2_bed = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ |
---|
| 246 | .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id==history3.id, |
---|
| 247 | galaxy.model.HistoryDatasetAssociation.table.c.name=='2.bed' ) ) \ |
---|
| 248 | .first() |
---|
| 249 | assert hda_2_bed is not None, "Problem retrieving hda_2_bed from database" |
---|
| 250 | self.delete_history_item( str( hda_2_bed.id ) ) |
---|
| 251 | self.upload_file( '3.bed', dbkey='hg18' ) |
---|
| 252 | hda_3_bed = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ |
---|
| 253 | .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id==history3.id, |
---|
| 254 | galaxy.model.HistoryDatasetAssociation.table.c.name=='3.bed' ) ) \ |
---|
| 255 | .first() |
---|
| 256 | assert hda_3_bed is not None, "Problem retrieving hda_3_bed from database" |
---|
| 257 | self.delete_history_item( str( hda_3_bed.id ) ) |
---|
| 258 | # Test cloning activatable datasets |
---|
| 259 | self.clone_history( self.security.encode_id( history3.id ), |
---|
| 260 | 'activatable', |
---|
| 261 | strings_displayed_after_submit=['is now included in your previously stored histories.' ] ) |
---|
| 262 | global history3_clone2 |
---|
| 263 | history3_clone2 = sa_session.query( galaxy.model.History ) \ |
---|
| 264 | .filter( and_( galaxy.model.History.table.c.deleted==False, |
---|
| 265 | galaxy.model.History.table.c.user_id==admin_user.id ) ) \ |
---|
| 266 | .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ |
---|
| 267 | .first() |
---|
| 268 | assert history3_clone2 is not None, "Problem retrieving history3_clone2 from database" |
---|
| 269 | # Check list of histories to make sure shared history3 was cloned |
---|
| 270 | self.view_stored_active_histories( strings_displayed=[ "Clone of '%s'" % history3.name ] ) |
---|
| 271 | # Switch to the cloned history to make sure activatable datasets were cloned |
---|
| 272 | self.switch_history( id=self.security.encode_id( history3_clone2.id ), name=history3_clone2.name ) |
---|
| 273 | hda_2_bed = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ |
---|
| 274 | .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id==history3_clone2.id, |
---|
| 275 | galaxy.model.HistoryDatasetAssociation.table.c.name=='2.bed' ) ) \ |
---|
| 276 | .first() |
---|
| 277 | assert hda_2_bed is not None, "Problem retrieving hda_2_bed from database" |
---|
| 278 | hda_3_bed = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ |
---|
| 279 | .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id==history3_clone2.id, |
---|
| 280 | galaxy.model.HistoryDatasetAssociation.table.c.name=='3.bed' ) ) \ |
---|
| 281 | .first() |
---|
| 282 | assert hda_3_bed is not None, "Problem retrieving hda_3_bed from database" |
---|
| 283 | # Make sure the deleted datasets are included in the cloned history |
---|
| 284 | check_str = 'This dataset has been deleted. Click undelete id=%d"' % hda_2_bed.id |
---|
| 285 | self.check_history_for_string( check_str, show_deleted=True ) |
---|
| 286 | check_str = 'This dataset has been deleted. Click undelete id=%d"' % hda_3_bed.id |
---|
| 287 | self.check_history_for_string( check_str, show_deleted=True ) |
---|
| 288 | # Test cloning only active datasets |
---|
| 289 | self.clone_history( self.security.encode_id( history3.id ), |
---|
| 290 | 'active', |
---|
| 291 | strings_displayed_after_submit=[ 'is now included in your previously stored histories.' ] ) |
---|
| 292 | global history3_clone3 |
---|
| 293 | history3_clone3 = sa_session.query( galaxy.model.History ) \ |
---|
| 294 | .filter( and_( galaxy.model.History.table.c.deleted==False, |
---|
| 295 | galaxy.model.History.table.c.user_id==admin_user.id ) ) \ |
---|
| 296 | .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ |
---|
| 297 | .first() |
---|
| 298 | assert history3_clone3 is not None, "Problem retrieving history3_clone3 from database" |
---|
| 299 | # Check list of histories to make sure shared history3 was cloned |
---|
| 300 | self.view_stored_active_histories( strings_displayed = ["Clone of '%s'" % history3.name ] ) |
---|
| 301 | # Switch to the cloned history to make sure activatable datasets were cloned |
---|
| 302 | self.switch_history( id=self.security.encode_id( history3_clone3.id ) ) |
---|
| 303 | # Make sure the deleted datasets are NOT included in the cloned history |
---|
| 304 | try: |
---|
| 305 | self.check_history_for_string( 'This dataset has been deleted.', show_deleted=True ) |
---|
| 306 | raise AssertionError, "Deleted datasets incorrectly included in cloned history history3_clone3" |
---|
| 307 | except: |
---|
| 308 | pass |
---|
| 309 | def test_040_sharing_mulitple_histories_with_multiple_users( self ): |
---|
| 310 | """Testing sharing multiple histories containing only public datasets with multiple users""" |
---|
| 311 | # Logged in as admin_user |
---|
| 312 | self.new_history() |
---|
| 313 | global history4 |
---|
| 314 | history4 = sa_session.query( galaxy.model.History ) \ |
---|
| 315 | .filter( and_( galaxy.model.History.table.c.deleted==False, |
---|
| 316 | galaxy.model.History.table.c.user_id==admin_user.id ) ) \ |
---|
| 317 | .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ |
---|
| 318 | .first() |
---|
| 319 | assert history4 is not None, "Problem retrieving history4 from database" |
---|
| 320 | self.rename_history( self.security.encode_id( history4.id ), history4.name, new_name=urllib.quote( 'history 4' ) ) |
---|
| 321 | sa_session.refresh( history4 ) |
---|
| 322 | # Galaxy's new history sharing code does not yet support sharing multiple histories; when support for sharing multiple histories is added, |
---|
| 323 | # this test will be uncommented and updated. |
---|
| 324 | """ |
---|
| 325 | self.upload_file( '2.bed', dbkey='hg18' ) |
---|
| 326 | ids = '%s,%s' % ( self.security.encode_id( history3.id ), self.security.encode_id( history4.id ) ) |
---|
| 327 | emails = '%s,%s' % ( regular_user2.email, regular_user3.email ) |
---|
| 328 | self.share_histories_with_users( ids, |
---|
| 329 | emails, |
---|
| 330 | strings_displayed=[ 'Share 2 histories', history4.name ] ) |
---|
| 331 | self.logout() |
---|
| 332 | self.login( email=regular_user2.email ) |
---|
| 333 | # Shared history3 should be in regular_user2's list of shared histories |
---|
| 334 | self.view_shared_histories( strings_displayed=[ history3.name, admin_user.email ] ) |
---|
| 335 | self.logout() |
---|
| 336 | self.login( email=regular_user3.email ) |
---|
| 337 | # Shared history3 should be in regular_user3's list of shared histories |
---|
| 338 | self.view_shared_histories( cstrings_displayed=[ history3.name, admin_user.email ] ) |
---|
| 339 | """ |
---|
| 340 | def test_045_change_permissions_on_current_history( self ): |
---|
| 341 | """Testing changing permissions on the current history""" |
---|
| 342 | # Logged in as regular_user3 |
---|
| 343 | self.logout() |
---|
| 344 | self.login( email=admin_user.email ) |
---|
| 345 | # Current history is history4 |
---|
| 346 | self.new_history() |
---|
| 347 | global history5 |
---|
| 348 | history5 = sa_session.query( galaxy.model.History ) \ |
---|
| 349 | .filter( and_( galaxy.model.History.table.c.deleted==False, |
---|
| 350 | galaxy.model.History.table.c.user_id==admin_user.id ) ) \ |
---|
| 351 | .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ |
---|
| 352 | .first() |
---|
| 353 | assert history5 is not None, "Problem retrieving history5 from database" |
---|
| 354 | self.rename_history( self.security.encode_id( history5.id ), history5.name, new_name=urllib.quote( 'history 5' ) ) |
---|
| 355 | # Current history is hostory5 |
---|
| 356 | sa_session.refresh( history5 ) |
---|
| 357 | # Due to the limitations of twill ( not functional with the permissions forms ), we're forced |
---|
| 358 | # to do this manually. At this point, we just want to restrict the access permission on history5 |
---|
| 359 | # to the admin_user |
---|
| 360 | global access_action |
---|
| 361 | access_action = galaxy.model.Dataset.permitted_actions.DATASET_ACCESS.action |
---|
| 362 | dhp = galaxy.model.DefaultHistoryPermissions( history5, access_action, admin_user_private_role ) |
---|
| 363 | sa_session.add( dhp ) |
---|
| 364 | sa_session.flush() |
---|
| 365 | sa_session.refresh( history5 ) |
---|
| 366 | global history5_default_permissions |
---|
| 367 | history5_default_permissions = [ dhp.action for dhp in history5.default_permissions ] |
---|
| 368 | # Sort for later comparison |
---|
| 369 | history5_default_permissions.sort() |
---|
| 370 | self.upload_file( '1.bed', dbkey='hg18' ) |
---|
| 371 | history5_dataset1 = None |
---|
| 372 | for hda in history5.datasets: |
---|
| 373 | if hda.name == '1.bed': |
---|
| 374 | history5_dataset1 = hda.dataset |
---|
| 375 | break |
---|
| 376 | assert history5_dataset1 is not None, "Problem retrieving history5_dataset1 from the database" |
---|
| 377 | # The permissions on the dataset should be restricted from sharing with anyone due to the |
---|
| 378 | # inherited history permissions |
---|
| 379 | dataset_permissions = [ a.action for a in history5_dataset1.actions ] |
---|
| 380 | dataset_permissions.sort() |
---|
| 381 | if dataset_permissions != history5_default_permissions: |
---|
| 382 | err_msg = "Dataset permissions for history5_dataset1 (%s) were not correctly inherited from history permissions (%s)" \ |
---|
| 383 | % ( str( dataset_permissions ), str( history5_default_permissions ) ) |
---|
| 384 | raise AssertionError, err_msg |
---|
| 385 | # Make sure when we logout and login, the history default permissions are preserved |
---|
| 386 | self.logout() |
---|
| 387 | self.login( email=admin_user.email ) |
---|
| 388 | sa_session.refresh( history5 ) |
---|
| 389 | current_history_permissions = [ dhp.action for dhp in history5.default_permissions ] |
---|
| 390 | current_history_permissions.sort() |
---|
| 391 | if current_history_permissions != history5_default_permissions: |
---|
| 392 | raise AssertionError, "With logout and login, the history default permissions are not preserved" |
---|
| 393 | def test_050_sharing_restricted_history_by_making_datasets_public( self ): |
---|
| 394 | """Testing sharing a restricted history by making the datasets public""" |
---|
| 395 | # Logged in as admin_user |
---|
| 396 | action_strings_displayed = [ 'The following datasets can be shared with %s by updating their permissions' % regular_user1.email ] |
---|
| 397 | # Current history is history5 |
---|
| 398 | self.share_current_history( regular_user1.email, |
---|
| 399 | action='public', |
---|
| 400 | action_strings_displayed=action_strings_displayed ) |
---|
| 401 | self.logout() |
---|
| 402 | self.login( email=regular_user1.email ) |
---|
| 403 | # Shared history5 should be in regular_user1's list of shared histories |
---|
| 404 | self.view_shared_histories( strings_displayed=[ history5.name, admin_user.email ] ) |
---|
| 405 | # Clone restricted history5 |
---|
| 406 | self.clone_history( self.security.encode_id( history5.id ), |
---|
| 407 | 'activatable', |
---|
| 408 | strings_displayed_after_submit=[ 'is now included in your previously stored histories.' ] ) |
---|
| 409 | global history5_clone1 |
---|
| 410 | history5_clone1 = sa_session.query( galaxy.model.History ) \ |
---|
| 411 | .filter( and_( galaxy.model.History.table.c.deleted==False, |
---|
| 412 | galaxy.model.History.table.c.user_id==regular_user1.id ) ) \ |
---|
| 413 | .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ |
---|
| 414 | .first() |
---|
| 415 | assert history5_clone1 is not None, "Problem retrieving history5_clone1 from database" |
---|
| 416 | # Check list of histories to make sure shared history5 was cloned |
---|
| 417 | self.view_stored_active_histories( strings_displayed=[ "Clone of '%s'" % history5.name ] ) |
---|
| 418 | # Make sure the dataset is accessible |
---|
| 419 | self.switch_history( id=self.security.encode_id( history5_clone1.id ), name=history5_clone1.name ) |
---|
| 420 | self.check_history_for_string( 'chr1' ) |
---|
| 421 | self.logout() |
---|
| 422 | self.login( email=admin_user.email ) |
---|
| 423 | def test_055_sharing_restricted_history_by_making_new_sharing_role( self ): |
---|
| 424 | """Testing sharing a restricted history by associating a new sharing role with protected datasets""" |
---|
| 425 | # At this point, history5 should have 1 item, 1.bed, which is public. We'll add another |
---|
| 426 | # item which will be private to admin_user due to the permissions on history5 |
---|
| 427 | self.upload_file( '2.bed', dbkey='hg18' ) |
---|
| 428 | strings_displayed_after_submit = [ 'The following datasets can be shared with %s with no changes' % regular_user2.email, |
---|
| 429 | 'The following datasets can be shared with %s by updating their permissions' % regular_user2.email ] |
---|
| 430 | self.share_current_history( regular_user2.email, |
---|
| 431 | strings_displayed_after_submit=strings_displayed_after_submit, |
---|
| 432 | action='private' ) |
---|
| 433 | # We should now have a new sharing role |
---|
| 434 | global sharing_role |
---|
| 435 | role_name = 'Sharing role for: %s, %s' % ( admin_user.email, regular_user2.email ) |
---|
| 436 | sharing_role = sa_session.query( galaxy.model.Role ).filter( galaxy.model.Role.table.c.name==role_name ).first() |
---|
| 437 | if not sharing_role: |
---|
| 438 | # May have created a sharing role in a previous functional test suite from the opposite direction. |
---|
| 439 | role_name = 'Sharing role for: %s, %s' % ( regular_user2.email, admin_user.email ) |
---|
| 440 | sharing_role = sa_session.query( galaxy.model.Role ) \ |
---|
| 441 | .filter( and_( galaxy.model.Role.table.c.type==role_type, |
---|
| 442 | galaxy.model.Role.table.c.name==role_name ) ) \ |
---|
| 443 | .first() |
---|
| 444 | if not sharing_role: |
---|
| 445 | raise AssertionError( "Privately sharing a dataset did not properly create a sharing role" ) |
---|
| 446 | # The DATASET_ACCESS permission on 2.bed was originally associated with admin_user's private role. |
---|
| 447 | # Since we created a new sharing role for 2.bed, the original permission should have been eliminated, |
---|
| 448 | # replaced with the sharing role. |
---|
| 449 | history5_dataset2 = None |
---|
| 450 | for hda in history5.datasets: |
---|
| 451 | if hda.name == '2.bed': |
---|
| 452 | history5_dataset2 = hda.dataset |
---|
| 453 | break |
---|
| 454 | assert history5_dataset2 is not None, "Problem retrieving history5_dataset2 from the database" |
---|
| 455 | for dp in history5_dataset2.actions: |
---|
| 456 | if dp.action == 'access': |
---|
| 457 | assert dp.role == sharing_role, "Associating new sharing role with history5_dataset2 did not correctly eliminate original DATASET ACCESS permissions" |
---|
| 458 | self.logout() |
---|
| 459 | self.login( email=regular_user2.email ) |
---|
| 460 | # Shared history5 should be in regular_user2's list of shared histories |
---|
| 461 | self.view_shared_histories( strings_displayed=[ history5.name, admin_user.email ] ) |
---|
| 462 | # Clone restricted history5 |
---|
| 463 | self.clone_history( self.security.encode_id( history5.id ), |
---|
| 464 | 'activatable', |
---|
| 465 | strings_displayed_after_submit=[ 'is now included in your previously stored histories.' ] ) |
---|
| 466 | global history5_clone2 |
---|
| 467 | history5_clone2 = sa_session.query( galaxy.model.History ) \ |
---|
| 468 | .filter( and_( galaxy.model.History.table.c.deleted==False, |
---|
| 469 | galaxy.model.History.table.c.user_id==regular_user2.id ) ) \ |
---|
| 470 | .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ |
---|
| 471 | .first() |
---|
| 472 | assert history5_clone2 is not None, "Problem retrieving history5_clone2 from database" |
---|
| 473 | # Check list of histories to make sure shared history3 was cloned |
---|
| 474 | self.view_stored_active_histories( strings_displayed=[ "Clone of '%s'" % history5.name ] ) |
---|
| 475 | # Make sure the dataset is accessible |
---|
| 476 | self.switch_history( id=self.security.encode_id( history5_clone2.id ), name=history5_clone2.name ) |
---|
| 477 | # Make sure both datasets are in the history |
---|
| 478 | self.check_history_for_string( '1.bed' ) |
---|
| 479 | self.check_history_for_string( '2.bed' ) |
---|
| 480 | # Get both new hdas from the db that were created for the shared history |
---|
| 481 | hda_1_bed = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ |
---|
| 482 | .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id==history5_clone2.id, |
---|
| 483 | galaxy.model.HistoryDatasetAssociation.table.c.name=='1.bed' ) ) \ |
---|
| 484 | .first() |
---|
| 485 | assert hda_1_bed is not None, "Problem retrieving hda_1_bed from database" |
---|
| 486 | hda_2_bed = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ |
---|
| 487 | .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id==history5_clone2.id, |
---|
| 488 | galaxy.model.HistoryDatasetAssociation.table.c.name=='2.bed' ) ) \ |
---|
| 489 | .first() |
---|
| 490 | assert hda_2_bed is not None, "Problem retrieving hda_2_bed from database" |
---|
| 491 | # Make sure 1.bed is accessible since it is public |
---|
| 492 | self.display_history_item( str( hda_1_bed.id ), strings_displayed=[ 'chr1' ] ) |
---|
| 493 | # Make sure 2.bed is accessible since it is associated with a sharing role |
---|
| 494 | self.display_history_item( str( hda_2_bed.id ), strings_displayed=[ 'chr1' ] ) |
---|
| 495 | # Delete the clone so the next test will be valid |
---|
| 496 | self.delete_history( id=self.security.encode_id( history5_clone2.id ) ) |
---|
| 497 | def test_060_sharing_restricted_history_with_multiple_users_by_changing_no_permissions( self ): |
---|
| 498 | """Testing sharing a restricted history with multiple users, making no permission changes""" |
---|
| 499 | # Logged in as regular_user2 |
---|
| 500 | self.logout() |
---|
| 501 | self.login( email=admin_user.email ) |
---|
| 502 | # History5 can be shared with any user, since it contains a public dataset ( 1.bed ). However, only |
---|
| 503 | # regular_user2 should be able to access history5's 2.bed dataset since it is associated with a |
---|
| 504 | # sharing role, and regular_user3 should be able to access history5's 1.bed, but not 2.bed even |
---|
| 505 | # though they can see it in their shared history. |
---|
| 506 | # We first need to unshare history5 from regular_user2 so that we can re-share it. |
---|
| 507 | self.unshare_history( self.security.encode_id( history5.id ), |
---|
| 508 | self.security.encode_id( regular_user2.id ), |
---|
| 509 | strings_displayed=[ regular_user1.email, regular_user2.email ] ) |
---|
| 510 | # Make sure the history was unshared correctly |
---|
| 511 | self.logout() |
---|
| 512 | self.login( email=regular_user2.email ) |
---|
| 513 | self.visit_page( "root/history_options" ) |
---|
| 514 | try: |
---|
| 515 | self.check_page_for_string( 'List</a> histories shared with you by others' ) |
---|
| 516 | raise AssertionError, "history5 still shared with regular_user2 after unshaing it with that user." |
---|
| 517 | except: |
---|
| 518 | pass |
---|
| 519 | self.logout() |
---|
| 520 | self.login( admin_user.email ) |
---|
| 521 | email = '%s,%s' % ( regular_user2.email, regular_user3.email ) |
---|
| 522 | strings_displayed_after_submit = [ 'The following datasets can be shared with %s with no changes' % email, |
---|
| 523 | 'The following datasets can be shared with %s by updating their permissions' % email ] |
---|
| 524 | # history5 will be shared with regular_user1, regular_user2 and regular_user3 |
---|
| 525 | self.share_current_history( email, |
---|
| 526 | strings_displayed_after_submit=strings_displayed_after_submit, |
---|
| 527 | action='share_anyway' ) |
---|
| 528 | # Check security on clone of history5 for regular_user2 |
---|
| 529 | self.logout() |
---|
| 530 | self.login( email=regular_user2.email ) |
---|
| 531 | # Shared history5 should be in regular_user2's list of shared histories |
---|
| 532 | self.view_shared_histories( strings_displayed=[ history5.name, admin_user.email ] ) |
---|
| 533 | # Clone restricted history5 |
---|
| 534 | self.clone_history( self.security.encode_id( history5.id ), |
---|
| 535 | 'activatable', |
---|
| 536 | strings_displayed_after_submit=[ 'is now included in your previously stored histories.' ] ) |
---|
| 537 | global history5_clone3 |
---|
| 538 | history5_clone3 = sa_session.query( galaxy.model.History ) \ |
---|
| 539 | .filter( and_( galaxy.model.History.table.c.deleted==False, |
---|
| 540 | galaxy.model.History.table.c.user_id==regular_user2.id ) ) \ |
---|
| 541 | .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ |
---|
| 542 | .first() |
---|
| 543 | assert history5_clone3 is not None, "Problem retrieving history5_clone3 from database" |
---|
| 544 | # Check list of histories to make sure shared history3 was cloned |
---|
| 545 | self.view_stored_active_histories( strings_displayed=[ "Clone of '%s'" % history5.name ] ) |
---|
| 546 | # Make sure the dataset is accessible |
---|
| 547 | self.switch_history( id=self.security.encode_id( history5_clone3.id ), name=history5_clone3.name ) |
---|
| 548 | # Make sure both datasets are in the history |
---|
| 549 | self.check_history_for_string( '1.bed' ) |
---|
| 550 | self.check_history_for_string( '2.bed' ) |
---|
| 551 | # Get both new hdas from the db that were created for the shared history |
---|
| 552 | hda_1_bed = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ |
---|
| 553 | .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id==history5_clone3.id, |
---|
| 554 | galaxy.model.HistoryDatasetAssociation.table.c.name=='1.bed' ) ) \ |
---|
| 555 | .first() |
---|
| 556 | assert hda_1_bed is not None, "Problem retrieving hda_1_bed from database" |
---|
| 557 | hda_2_bed = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ |
---|
| 558 | .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id==history5_clone3.id, |
---|
| 559 | galaxy.model.HistoryDatasetAssociation.table.c.name=='2.bed' ) ) \ |
---|
| 560 | .first() |
---|
| 561 | assert hda_2_bed is not None, "Problem retrieving hda_2_bed from database" |
---|
| 562 | # Make sure 1.bed is accessible since it is public |
---|
| 563 | self.display_history_item( str( hda_1_bed.id ), strings_displayed=[ 'chr1' ] ) |
---|
| 564 | # Make sure 2.bed is accessible since it is associated with a sharing role |
---|
| 565 | self.display_history_item( str( hda_2_bed.id ), strings_displayed=[ 'chr1' ] ) |
---|
| 566 | # Delete the clone so the next test will be valid |
---|
| 567 | self.delete_history( id=self.security.encode_id( history5_clone3.id ) ) |
---|
| 568 | # Check security on clone of history5 for regular_user3 |
---|
| 569 | self.logout() |
---|
| 570 | self.login( email=regular_user3.email ) |
---|
| 571 | # Shared history5 should be in regular_user2's list of shared histories |
---|
| 572 | self.view_shared_histories( strings_displayed=[ history5.name, admin_user.email ] ) |
---|
| 573 | # Clone restricted history5 |
---|
| 574 | self.clone_history( self.security.encode_id( history5.id ), |
---|
| 575 | 'activatable', |
---|
| 576 | strings_displayed_after_submit=[ 'is now included in your previously stored histories.' ] ) |
---|
| 577 | global history5_clone4 |
---|
| 578 | history5_clone4 = sa_session.query( galaxy.model.History ) \ |
---|
| 579 | .filter( and_( galaxy.model.History.table.c.deleted==False, |
---|
| 580 | galaxy.model.History.table.c.user_id==regular_user3.id ) ) \ |
---|
| 581 | .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ |
---|
| 582 | .first() |
---|
| 583 | assert history5_clone4 is not None, "Problem retrieving history5_clone4 from database" |
---|
| 584 | # Check list of histories to make sure shared history3 was cloned |
---|
| 585 | self.view_stored_active_histories( strings_displayed=[ "Clone of '%s'" % history5.name ] ) |
---|
| 586 | # Make sure the dataset is accessible |
---|
| 587 | self.switch_history( id=self.security.encode_id( history5_clone4.id ), name=history5_clone4.name ) |
---|
| 588 | # Make sure both datasets are in the history |
---|
| 589 | self.check_history_for_string( '1.bed' ) |
---|
| 590 | self.check_history_for_string( '2.bed' ) |
---|
| 591 | # Get both new hdas from the db that were created for the shared history |
---|
| 592 | hda_1_bed = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ |
---|
| 593 | .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id==history5_clone4.id, |
---|
| 594 | galaxy.model.HistoryDatasetAssociation.table.c.name=='1.bed' ) ) \ |
---|
| 595 | .first() |
---|
| 596 | assert hda_1_bed is not None, "Problem retrieving hda_1_bed from database" |
---|
| 597 | hda_2_bed = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ |
---|
| 598 | .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id==history5_clone4.id, |
---|
| 599 | galaxy.model.HistoryDatasetAssociation.table.c.name=='2.bed' ) ) \ |
---|
| 600 | .first() |
---|
| 601 | assert hda_2_bed is not None, "Problem retrieving hda_2_bed from database" |
---|
| 602 | # Make sure 1.bed is accessible since it is public |
---|
| 603 | self.display_history_item( str( hda_1_bed.id ), strings_displayed=[ 'chr1' ] ) |
---|
| 604 | # Make sure 2.bed is not accessible since it is protected |
---|
| 605 | try: |
---|
| 606 | self.display_history_item( str( hda_2_bed.id ), strings_displayed=[ 'chr1' ] ) |
---|
| 607 | raise AssertionError, "History item 2.bed is accessible by user %s when is should not be" % regular_user3.email |
---|
| 608 | except: |
---|
| 609 | pass |
---|
| 610 | self.check_history_for_string( 'You do not have permission to view this dataset' ) |
---|
| 611 | # Admin users can view all datasets ( using the history/view feature ), so make sure 2.bed is accessible to the admin |
---|
| 612 | self.logout() |
---|
| 613 | self.login( email=admin_user.email ) |
---|
| 614 | self.view_history( str( hda_2_bed.history_id ), strings_displayed=[ '<td>NM_005997_cds_0_0_chr1_147962193_r</td>' ] ) |
---|
| 615 | self.logout() |
---|
| 616 | self.login( email=regular_user3.email ) |
---|
| 617 | # Delete the clone so the next test will be valid |
---|
| 618 | self.delete_history( id=self.security.encode_id( history5_clone4.id ) ) |
---|
| 619 | def test_065_sharing_private_history_by_choosing_to_not_share( self ): |
---|
| 620 | """Testing sharing a restricted history with multiple users by choosing not to share""" |
---|
| 621 | # Logged in as regular_user3 |
---|
| 622 | self.logout() |
---|
| 623 | self.login( email=admin_user.email ) |
---|
| 624 | # Unshare history5 from regular_user2 |
---|
| 625 | self.unshare_history( self.security.encode_id( history5.id ), |
---|
| 626 | self.security.encode_id( regular_user2.id ), |
---|
| 627 | strings_displayed=[ regular_user1.email, regular_user2.email ] ) |
---|
| 628 | # Unshare history5 from regular_user3 |
---|
| 629 | self.unshare_history( self.security.encode_id( history5.id ), |
---|
| 630 | self.security.encode_id( regular_user3.id ), |
---|
| 631 | strings_displayed=[ regular_user1.email, regular_user3.email ] ) |
---|
| 632 | # Make sure the history was unshared correctly |
---|
| 633 | self.logout() |
---|
| 634 | self.login( email=regular_user2.email ) |
---|
| 635 | self.visit_page( "root/history_options" ) |
---|
| 636 | try: |
---|
| 637 | self.check_page_for_string( 'List</a> histories shared with you by others' ) |
---|
| 638 | raise AssertionError, "history5 still shared with regular_user2 after unshaing it with that user." |
---|
| 639 | except: |
---|
| 640 | pass |
---|
| 641 | self.logout() |
---|
| 642 | self.login( email=regular_user3.email ) |
---|
| 643 | self.visit_page( "root/history_options" ) |
---|
| 644 | try: |
---|
| 645 | self.check_page_for_string( 'List</a> histories shared with you by others' ) |
---|
| 646 | raise AssertionError, "history5 still shared with regular_user3 after unshaing it with that user." |
---|
| 647 | except: |
---|
| 648 | pass |
---|
| 649 | self.logout() |
---|
| 650 | self.login( email=admin_user.email ) |
---|
| 651 | def test_070_history_show_and_hide_deleted_datasets( self ): |
---|
| 652 | """Testing displaying deleted history items""" |
---|
| 653 | # Logged in as admin_user |
---|
| 654 | self.new_history( name=urllib.quote( 'show hide deleted datasets' ) ) |
---|
| 655 | latest_history = sa_session.query( galaxy.model.History ) \ |
---|
| 656 | .filter( and_( galaxy.model.History.table.c.deleted==False, |
---|
| 657 | galaxy.model.History.table.c.user_id==admin_user.id ) ) \ |
---|
| 658 | .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ |
---|
| 659 | .first() |
---|
| 660 | assert latest_history is not None, "Problem retrieving latest_history from database" |
---|
| 661 | self.upload_file('1.bed', dbkey='hg18') |
---|
| 662 | latest_hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ |
---|
| 663 | .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ |
---|
| 664 | .first() |
---|
| 665 | self.home() |
---|
| 666 | self.visit_url( "%s/root/delete?show_deleted_on_refresh=False&id=%s" % ( self.url, str( latest_hda.id ) ) ) |
---|
| 667 | self.check_history_for_string( 'Your history is empty' ) |
---|
| 668 | self.home() |
---|
| 669 | self.visit_url( "%s/history/?show_deleted=True" % self.url ) |
---|
| 670 | self.check_page_for_string( 'This dataset has been deleted.' ) |
---|
| 671 | self.check_page_for_string( '1.bed' ) |
---|
| 672 | self.home() |
---|
| 673 | self.visit_url( "%s/history/?show_deleted=False" % self.url ) |
---|
| 674 | self.check_page_for_string( 'Your history is empty' ) |
---|
| 675 | self.delete_history( self.security.encode_id( latest_history.id ) ) |
---|
| 676 | def test_075_deleting_and_undeleting_history_items( self ): |
---|
| 677 | """Testing deleting and un-deleting history items""" |
---|
| 678 | # logged in as admin_user |
---|
| 679 | # Deleting the current history in the last method created a new history |
---|
| 680 | latest_history = sa_session.query( galaxy.model.History ) \ |
---|
| 681 | .filter( and_( galaxy.model.History.table.c.deleted==False, |
---|
| 682 | galaxy.model.History.table.c.user_id==admin_user.id ) ) \ |
---|
| 683 | .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ |
---|
| 684 | .first() |
---|
| 685 | assert latest_history is not None, "Problem retrieving latest_history from database" |
---|
| 686 | self.rename_history( self.security.encode_id( latest_history.id ), latest_history.name, new_name=urllib.quote( 'delete undelete history items' ) ) |
---|
| 687 | # Add a new history item |
---|
| 688 | self.upload_file( '1.bed', dbkey='hg15' ) |
---|
| 689 | latest_hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ |
---|
| 690 | .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ |
---|
| 691 | .first() |
---|
| 692 | self.home() |
---|
| 693 | self.visit_url( "%s/history/?show_deleted=False" % self.url ) |
---|
| 694 | self.check_page_for_string( '1.bed' ) |
---|
| 695 | self.check_page_for_string( 'hg15' ) |
---|
| 696 | self.assertEqual ( len( self.get_history_as_data_list() ), 1 ) |
---|
| 697 | # Delete the history item |
---|
| 698 | self.delete_history_item( str( latest_hda.id ), strings_displayed=[ "Your history is empty" ] ) |
---|
| 699 | self.assertEqual ( len( self.get_history_as_data_list() ), 0 ) |
---|
| 700 | # Try deleting an invalid hid |
---|
| 701 | try: |
---|
| 702 | self.delete_history_item( 'XXX' ) |
---|
| 703 | raise AssertionError, "Inproperly able to delete hda_id 'XXX' which is not an integer" |
---|
| 704 | except: |
---|
| 705 | pass |
---|
| 706 | # Undelete the history item |
---|
| 707 | self.undelete_history_item( str( latest_hda.id ) ) |
---|
| 708 | self.home() |
---|
| 709 | self.visit_url( "%s/history/?show_deleted=False" % self.url ) |
---|
| 710 | self.check_page_for_string( '1.bed' ) |
---|
| 711 | self.check_page_for_string( 'hg15' ) |
---|
| 712 | self.delete_history( self.security.encode_id( latest_history.id ) ) |
---|
| 713 | def test_080_copying_history_items_between_histories( self ): |
---|
| 714 | """Testing copying history items between histories""" |
---|
| 715 | # logged in as admin_user |
---|
| 716 | self.new_history( name=urllib.quote( 'copy history items' ) ) |
---|
| 717 | history6 = sa_session.query( galaxy.model.History ) \ |
---|
| 718 | .filter( and_( galaxy.model.History.table.c.deleted==False, |
---|
| 719 | galaxy.model.History.table.c.user_id==admin_user.id ) ) \ |
---|
| 720 | .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ |
---|
| 721 | .first() |
---|
| 722 | assert history6 is not None, "Problem retrieving history6 from database" |
---|
| 723 | self.upload_file( '1.bed', dbkey='hg18' ) |
---|
| 724 | hda1 = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ |
---|
| 725 | .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ |
---|
| 726 | .first() |
---|
| 727 | assert hda1 is not None, "Problem retrieving hda1 from database" |
---|
| 728 | # We'll just test copying 1 hda |
---|
| 729 | source_dataset_ids=str( hda1.id ) |
---|
| 730 | # The valid list of target histories is only the user's active histories |
---|
| 731 | all_target_history_ids = [ str( hda.id ) for hda in admin_user.active_histories ] |
---|
| 732 | # Since history1 and history2 have been deleted, they should not be displayed in the list of target histories |
---|
| 733 | # on the copy_view.mako form |
---|
| 734 | deleted_history_ids = [ str( history1.id ), str( history2.id ) ] |
---|
| 735 | # Test copying to the current history |
---|
| 736 | target_history_ids=[ str( history6.id ) ] |
---|
| 737 | self.copy_history_item( source_dataset_ids=source_dataset_ids, |
---|
| 738 | target_history_ids=target_history_ids, |
---|
| 739 | all_target_history_ids=all_target_history_ids, |
---|
| 740 | deleted_history_ids=deleted_history_ids ) |
---|
| 741 | sa_session.refresh( history6 ) |
---|
| 742 | if len( history6.datasets ) != 2: |
---|
| 743 | raise AssertionError, "Copying hda1 to the current history failed, history 6 has %d datasets, but should have 2" % len( history6.datasets ) |
---|
| 744 | # Test copying 1 hda to another history |
---|
| 745 | self.new_history( name=urllib.quote( 'copy history items - 2' ) ) |
---|
| 746 | history7 = sa_session.query( galaxy.model.History ) \ |
---|
| 747 | .filter( and_( galaxy.model.History.table.c.deleted==False, |
---|
| 748 | galaxy.model.History.table.c.user_id==admin_user.id ) ) \ |
---|
| 749 | .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ |
---|
| 750 | .first() |
---|
| 751 | assert history7 is not None, "Problem retrieving history7 from database" |
---|
| 752 | # Switch back to our history from which we want to copy |
---|
| 753 | self.switch_history( id=self.security.encode_id( history6.id ), name=history6.name ) |
---|
| 754 | target_history_ids=[ str( history7.id ) ] |
---|
| 755 | all_target_history_ids = [ str( hda.id ) for hda in admin_user.active_histories ] |
---|
| 756 | # Test copying to the a history that is not the current history |
---|
| 757 | target_history_ids=[ str( history7.id ) ] |
---|
| 758 | self.copy_history_item( source_dataset_ids=source_dataset_ids, |
---|
| 759 | target_history_ids=target_history_ids, |
---|
| 760 | all_target_history_ids=all_target_history_ids, |
---|
| 761 | deleted_history_ids=deleted_history_ids ) |
---|
| 762 | # Switch to the history to which we copied |
---|
| 763 | self.switch_history( id=self.security.encode_id( history7.id ), name=history7.name ) |
---|
| 764 | self.check_history_for_string( hda1.name ) |
---|
| 765 | self.delete_history( self.security.encode_id( history6.id ) ) |
---|
| 766 | self.delete_history( self.security.encode_id( history7.id ) ) |
---|
| 767 | def test_085_reset_data_for_later_test_runs( self ): |
---|
| 768 | """Reseting data to enable later test runs to to be valid""" |
---|
| 769 | # logged in as admin_user |
---|
| 770 | # Clean up admin_user |
---|
| 771 | # Unshare history3 - shared with regular_user1, regular_user2, regular_user3 |
---|
| 772 | self.unshare_history( self.security.encode_id( history3.id ), |
---|
| 773 | self.security.encode_id( regular_user1.id ) ) |
---|
| 774 | self.unshare_history( self.security.encode_id( history3.id ), |
---|
| 775 | self.security.encode_id( regular_user2.id ) ) |
---|
| 776 | self.unshare_history( self.security.encode_id( history3.id ), |
---|
| 777 | self.security.encode_id( regular_user3.id ) ) |
---|
| 778 | # Unshare history4 - shared with regular_user2, regular_user3 |
---|
| 779 | self.unshare_history( self.security.encode_id( history4.id ), |
---|
| 780 | self.security.encode_id( regular_user2.id ) ) |
---|
| 781 | self.unshare_history( self.security.encode_id( history4.id ), |
---|
| 782 | self.security.encode_id( regular_user3.id ) ) |
---|
| 783 | # Unshare history5 - shared with regular_user1 |
---|
| 784 | self.unshare_history( self.security.encode_id( history5.id ), |
---|
| 785 | self.security.encode_id( regular_user1.id ) ) |
---|
| 786 | # Delete histories |
---|
| 787 | self.delete_history( id=self.security.encode_id( history3.id ) ) |
---|
| 788 | self.delete_history( id=self.security.encode_id( history3_clone2.id ) ) |
---|
| 789 | self.delete_history( id=self.security.encode_id( history3_clone3.id ) ) |
---|
| 790 | self.delete_history( id=self.security.encode_id( history4.id ) ) |
---|
| 791 | self.delete_history( id=self.security.encode_id( history5.id ) ) |
---|
| 792 | # Eliminate Sharing role for: test@bx.psu.edu, test2@bx.psu.edu |
---|
| 793 | self.mark_role_deleted( self.security.encode_id( sharing_role.id ), sharing_role.name ) |
---|
| 794 | self.purge_role( self.security.encode_id( sharing_role.id ), sharing_role.name ) |
---|
| 795 | # Manually delete the sharing role from the database |
---|
| 796 | sa_session.refresh( sharing_role ) |
---|
| 797 | sa_session.delete( sharing_role ) |
---|
| 798 | sa_session.flush() |
---|
| 799 | # Clean up regular_user_1 |
---|
| 800 | self.logout() |
---|
| 801 | self.login( email=regular_user1.email ) |
---|
| 802 | self.delete_history( id=self.security.encode_id( history3_clone1.id ) ) |
---|
| 803 | self.delete_history( id=self.security.encode_id( history5_clone1.id ) ) |
---|