import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.Random; import async.ASync; import async.ASyncEvent; import async.ASyncServiceInspector; import async.ASyncStartedCallback; import async.AddressPortPair; import async.common.Message; import async.common.MessageQueue; import async.common.ProcessReadyCallback; import asynctcp.ASyncTCP; import asynctcp.ASyncTCPListener; import asynctcp.Connection; import asynctcp.ConnectionListener; import asynctcp.IncomingConnection; import asynctcp.OutgoingConnection; import asynctcp.Connection.ConnectionState; /** * An example application using ASync to send 64KB from port 7000 to port 7001 * via TCP/IP with only one thread. * * @author Michael Parker */ public class ASyncExample { // the size of the message to send public static final int MESSAGE_SIZE = 64 * 1024; public static void main(String[] args) throws UnknownHostException { final CountingBarrier cb = new CountingBarrier(); // create the ASync instance final ASync as = new ASync(); // create two ASyncTCP objects: one sending, one receiving final ASyncTCP tcp1 = new ASyncTCP(7000); final ASyncTCP tcp2 = new ASyncTCP(7001); final AddressPortPair app2 = new AddressPortPair( InetAddress.getLocalHost(), 7001); // add ASyncTCP objects as services to ASync instance as.addService(tcp1.getService()); as.addService(tcp2.getService()); // pause main thread until ASync instance started cb.set(1); new Thread() { public void run() { // make callback to invoke once ASync instance starts running final ASyncStartedCallback assc = new ASyncStartedCallback() { public void nowStarted(ASyncServiceInspector assi) { // notify main thread that services are started cb.touch(); } }; as.run(assc); } }.start(); cb.await(); // allocate byte buffer, fill with random bytes to send Random rng = new Random(); final ByteBuffer sent_bytes = ByteBuffer.allocate(MESSAGE_SIZE); rng.nextBytes(sent_bytes.array()); // create byte buffer to hold received bytes final ByteBuffer received_bytes = ByteBuffer.allocate(MESSAGE_SIZE); cb.set(2); as.addEvent(new ASyncEvent() { public void run(long curr_time) { // set listener to notify when sender connects tcp2.getConnectionManager().setListener(new ASyncTCPListener() { public void newConnection(IncomingConnection ic) { // notify main thread that connected to tcp1 cb.touch(); } public void nowRunning() { // do nothing } public void serviceStopped() { // do nothing } public void providerStopped() { // do nothing } }); // open outgoing connection to tcp2 OutgoingConnection oc = tcp1.getConnectionManager().openConnection( app2, 10 * 1000L); oc.setListener(new ConnectionListener() { public void changeStatus(ConnectionState prev_state, ConnectionState curr_state, Connection conn) { if ((prev_state == ConnectionState.CONNECTING) && (curr_state == ConnectionState.CONNECTED)) { // notify main thread that now connected to tcp2 cb.touch(); } } public boolean bytesReceived(ByteBuffer bb, Connection conn, ProcessReadyCallback prc) { return true; } }); } }); // wait until both sides have established a connection cb.await(); cb.set(1); as.addEvent(new ASyncEvent() { public void run(long curr_time) { // set listener to save message at receiver IncomingConnection ic = tcp2.getConnectionManager().getIncomingConnections().get(0); ic.setListener(new ConnectionListener() { public void changeStatus(ConnectionState prev_state, ConnectionState curr_state, Connection conn) { } public boolean bytesReceived(ByteBuffer bb, Connection conn, ProcessReadyCallback prc) { received_bytes.put(bb); if (!received_bytes.hasRemaining()) { // message received because buffer is full, notify main thread cb.touch(); } return true; } }); // enqueue message at sender OutgoingConnection oc = tcp1.getConnectionManager().getOutgoingConnections().get(0); MessageQueue mq = oc.getMessageQueue(); mq.enqueue(new Message() { public boolean serialize(ByteBuffer bb, ProcessReadyCallback prc) { if (!sent_bytes.hasRemaining()) { // alrady sent all bytes, so return without filling buffer return true; } // put next bytes to send into buffer int bb_remaining = bb.remaining(); if (bb_remaining < sent_bytes.remaining()) { int prev_limit = sent_bytes.limit(); sent_bytes.limit(sent_bytes.position() + bb_remaining); bb.put(sent_bytes); sent_bytes.limit(prev_limit); } else { bb.put(sent_bytes); } return true; } }); // disconnect immediately once message is sent oc.disconnect(); } }); // wait for the receiver to get the message in its entirety cb.await(); // stop ASync instance as.stop(); // assure that sent bytes and received bytes are equal sent_bytes.position(0); received_bytes.flip(); while (received_bytes.hasRemaining()) { if (received_bytes.get() != sent_bytes.get()) { System.err.println("Error: Incorrectly sent " + MESSAGE_SIZE + " bytes."); return; } } System.out.println("Correctly sent " + MESSAGE_SIZE + " bytes."); } }