我在这里采用的方法是使测试尽可能细化,并模拟其他调用。在这种情况下,你会想嘲笑你的
Pool
对象并验证它调用的是您期望的内容,而不是在测试期间实际依赖它来启动子进程。我是这么想的:
@patch('yourmodule.Pool')
def test_study_upload(self, mock_pool_init):
mock_pool_instance = mock_pool_init.return_value.__enter__.return_value
with Wrapper() as wrapper:
wrapper.upload_documents(documents)
# To get the upload file arg here, you'll need to either mock the partial call here,
# or actually call it and get the return value
mock_pool_instance.starmap.assert_called_once_with_args(upload_file, documents)
然后,您需要使用现有逻辑并分别测试上载文档功能:
@patch.object(Session, 'post')
def test_upload_file(self, post_mock):
response_mock = Mock()
post_mock.return_value = response_mock
response_mock.ok = True
with Wrapper() as wrapper:
wrapper.upload_document(document)
mc = post_mock.mock_calls
这使您可以覆盖创建和控制池的函数以及池实例调用的函数。注意,我没有测试这个,但我留下一些给你来填补空白,因为它看起来像是你原来问题中实际模块的缩写版本。
试试这个:
def test_study_upload(self):
def call_direct(func_var, documents):
return func_var(documents)
with patch('yourmodule.Pool.starmap', new=call_direct)
with Wrapper() as wrapper:
wrapper.upload_documents(documents)
这是修补starmap调用,以便它直接调用您传入的函数。它完全绕过了池;底线是您不能真正深入到那些由多处理创建的子流程中。