注意,多个消息(多播)的发送可以通过NIO API(Java.NIO)在一个阻塞方法中实现,而不需要创建新线程。不过,NIO相当复杂。
我将首先编写测试,使用一个测试定义的statusListener实现,它将所有更新事件存储在一个列表中。当dispatch()方法返回时,测试可以在事件列表的状态上执行断言。
使用线程或NIO是消息类的实现细节。因此,除非您不介意将测试耦合到这个实现细节,否则我建议引入一个助手类,该类负责发送多个异步消息,并在任何异步响应时通知消息对象。然后,您可以在单元测试中模拟助手类,而不将它们耦合到线程或NIO。
我成功地为向一个客户机发送消息的情况实现了一个测试。我还对原始生产代码进行了一些更改,如下所示:
public class Message
{
private static final int CLIENT_PORT = 8000;
// Externally provided:
private final Client[] to;
private final String contents;
private final StatusListener listener;
// Internal state:
private final List<Socket> clientConnections;
private final BlockingQueue<Status> statusQueue;
public Message(Client[] to, String contents, StatusListener listener)
{
this.to = to;
this.contents = contents;
this.listener = listener;
// Keep a list of socket references so that all threads can be closed:
clientConnections = Collections.synchronizedList(new ArrayList<Socket>());
// Initialise the statusQ for threads to report message status:
statusQueue = new ArrayBlockingQueue<Status>(to.length * 3);
}
public void dispatch()
{
// Dispatch to each client individually and wait for confirmation:
sendContentsToEachClientAsynchronously();
Status statusChangeReceived;
do {
try {
// Now, monitor queue and empty the queue as it fills up (consumer):
statusChangeReceived = statusQueue.take();
}
catch (InterruptedException ignore) {
break;
}
}
while (listener.updateStatus(statusChangeReceived));
closeRemainingClientConnections();
}
private void closeRemainingClientConnections()
{
for (Socket connection : clientConnections) {
try {
connection.close();
}
catch (IOException ignore) {
// OK
}
}
clientConnections.clear();
}
private void sendContentsToEachClientAsynchronously()
{
for (Client client : to) {
System.out.println("Started new thread");
new Thread(new MessageDispatcher(client)).start();
}
}
// One MessageDispatcher per client.
private final class MessageDispatcher implements Runnable
{
private final Client client;
MessageDispatcher(Client client) { this.client = client; }
public void run()
{
try {
communicateWithClient();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
private void communicateWithClient() throws IOException
{
// Open connection to client:
Socket connection = new Socket(client.getAddress(), CLIENT_PORT);
try {
// Add client connection to synchronized list:
clientConnections.add(connection);
sendMessage(connection.getOutputStream());
readRequiredReceipts(connection.getInputStream());
}
finally {
connection.close();
}
}
// Send message and confirm dispatch.
private void sendMessage(OutputStream output)
{
PrintWriter out = new PrintWriter(output, true);
out.println(contents);
statusQueue.add(new Status(client, "DISPATCHED"));
}
private void readRequiredReceipts(InputStream input) throws IOException
{
BufferedReader in = new BufferedReader(new InputStreamReader(input));
// Wait for display receipt:
in.readLine();
statusQueue.add(new Status(client, "DISPLAYED"));
// Wait for read receipt:
in.readLine();
statusQueue.add(new Status(client, "READ"));
}
}
}
public final class MessageTest extends JMockitTest
{
static final String testContents = "hello there";
static final String[] expectedEvents = {"DISPATCHED", "DISPLAYED", "READ"};
@Test
public void testSendMessageToSingleClient()
{
final Client theClient = new Client("client1");
Client[] testClient = {theClient};
new MockUp<Socket>()
{
@Mock(invocations = 1)
void $init(String host, int port)
{
assertEquals(theClient.getAddress(), host);
assertTrue(port > 0);
}
@Mock(invocations = 1)
public OutputStream getOutputStream() { return new ByteArrayOutputStream(); }
@Mock(invocations = 1)
public InputStream getInputStream()
{
return new ByteArrayInputStream("reply1\nreply2\n".getBytes());
}
@Mock(minInvocations = 1) void close() {}
};
StatusListener listener = new MockUp<StatusListener>()
{
int eventIndex;
@Mock(invocations = 3)
boolean updateStatus(Status status)
{
assertSame(theClient, status.getClient());
assertEquals(expectedEvents[eventIndex++], status.getEvent());
return eventIndex < expectedEvents.length;
}
}.getMockInstance();
new Message(testClient, testContents, listener).dispatch();
}
}
上面的jmockit测试使用了
MockUp
类,在最新版本中尚不可用。它可以换成
Mockit.setUpMock(Socket.class, new Object() { ... })
但是。