protobuf: Support zero-copy encode/decode

We can do better with our encode/decode passes by taking better
advantage of the various APIs expose through CodedInputStream
and CodedOutputStream to support fewer copies for a number of
different buffer types, including java.nio.ByteBuffer and the
protobuf ByteString wrapper.

During encode we no longer recursively encode a nested object into
a temporary ByteString and append it to our own buffer, but instead
compute its size via the same logic as sizeof(), and then do the
encoding of the data.  Most nested messages will be fairly small
and thus quite easy to compute their nested size, so avoiding the
copy will improve encoding performance.

Also cleaned up the way sizeof generates, so decompiled code is
read correctly as "i += sz" rather than "i = sz + i".

Also fixed the decode function so that the input ByteBuffer is
correctly positioned at the end with no remaining input if we have
consumed everything in the stream.

Change-Id: Ie308c071c4302c55938a2e3cb014433d0c34e7aa
Signed-off-by: Shawn O. Pearce <sop@google.com>
diff --git a/src/main/java/com/google/gwtorm/nosql/generic/GenericAccess.java b/src/main/java/com/google/gwtorm/nosql/generic/GenericAccess.java
index 06ac20f..c58eb23 100644
--- a/src/main/java/com/google/gwtorm/nosql/generic/GenericAccess.java
+++ b/src/main/java/com/google/gwtorm/nosql/generic/GenericAccess.java
@@ -226,7 +226,7 @@
     writeNewIndexes(null, nObj);
 
     final byte[] key = dataRowKey(primaryKey(nObj));
-    db.insert(key, getObjectCodec().encode(nObj).toByteArray());
+    db.insert(key, getObjectCodec().encodeToByteString(nObj).toByteArray());
   }
 
   @Override
@@ -257,7 +257,7 @@
     }
 
     writeNewIndexes(oldObj, newObj);
-    db.upsert(key, getObjectCodec().encode(newObj).toByteArray());
+    db.upsert(key, getObjectCodec().encodeToByteString(newObj).toByteArray());
     pruneOldIndexes(oldObj, newObj);
   }
 
@@ -342,7 +342,7 @@
             } catch (OrmException err) {
               throw new IndexException(err);
             }
-            return getObjectCodec().encode(newObj).toByteArray();
+            return getObjectCodec().encodeToByteString(newObj).toByteArray();
 
           } else {
             res[0] = null;
@@ -435,7 +435,7 @@
     encodePrimaryKey(b, primaryKey(obj));
     final byte[] key = b.toByteArray();
 
-    return IndexRow.CODEC.encode(IndexRow.forKey(now, key)).toByteArray();
+    return IndexRow.CODEC.encodeToByteString(IndexRow.forKey(now, key)).toByteArray();
   }
 
   private static class IndexException extends RuntimeException {
diff --git a/src/main/java/com/google/gwtorm/nosql/generic/GenericSchema.java b/src/main/java/com/google/gwtorm/nosql/generic/GenericSchema.java
index 38568ff..a70fdae 100644
--- a/src/main/java/com/google/gwtorm/nosql/generic/GenericSchema.java
+++ b/src/main/java/com/google/gwtorm/nosql/generic/GenericSchema.java
@@ -99,7 +99,7 @@
           }
 
           res[0] = ctr.next();
-          return CounterShard.CODEC.encode(ctr).toByteArray();
+          return CounterShard.CODEC.encodeToByteString(ctr).toByteArray();
         }
       });
       return res[0];
diff --git a/src/main/java/com/google/gwtorm/protobuf/ByteBufferOutputStream.java b/src/main/java/com/google/gwtorm/protobuf/ByteBufferOutputStream.java
new file mode 100644
index 0000000..0316d31
--- /dev/null
+++ b/src/main/java/com/google/gwtorm/protobuf/ByteBufferOutputStream.java
@@ -0,0 +1,36 @@
+// Copyright 2010 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.gwtorm.protobuf;
+
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+class ByteBufferOutputStream extends OutputStream {
+  private final ByteBuffer buffer;
+
+  ByteBufferOutputStream(ByteBuffer buffer) {
+    this.buffer = buffer;
+  }
+
+  @Override
+  public void write(int b) {
+    buffer.put((byte) b);
+  }
+
+  @Override
+  public void write(byte[] src, int offset, int length) {
+    buffer.put(src, offset, length);
+  }
+}
diff --git a/src/main/java/com/google/gwtorm/protobuf/CodecGen.java b/src/main/java/com/google/gwtorm/protobuf/CodecGen.java
index eb26460..f7effde 100644
--- a/src/main/java/com/google/gwtorm/protobuf/CodecGen.java
+++ b/src/main/java/com/google/gwtorm/protobuf/CodecGen.java
@@ -32,7 +32,6 @@
 import org.objectweb.asm.Opcodes;
 import org.objectweb.asm.Type;
 
-import java.io.OutputStream;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -185,14 +184,17 @@
         sizeofMessage(sort(f.getNestedColumns()), mv, cgs);
         cgs.sizeVar = oldVar;
 
+        cgs.preinc();
         cgs.push(f.getColumnID());
-        cgs.inc("computeTagSize", Type.INT_TYPE);
+        cgs.doinc("computeTagSize", Type.INT_TYPE);
 
+        cgs.preinc();
         mv.visitVarInsn(ILOAD, msgVar);
-        cgs.inc("computeRawVarint32Size", Type.INT_TYPE);
+        cgs.doinc("computeRawVarint32Size", Type.INT_TYPE);
 
+        cgs.preinc();
         mv.visitVarInsn(ILOAD, msgVar);
-        cgs.inc();
+        cgs.doinc();
 
         cgs.freeLocal(msgVar);
         mv.visitLabel(end);
@@ -208,41 +210,47 @@
 
     switch (Type.getType(f.getPrimitiveType()).getSort()) {
       case Type.BOOLEAN:
+        cgs.preinc();
         cgs.push(f.getColumnID());
         cgs.pushFieldValue();
-        cgs.inc("computeBoolSize", Type.INT_TYPE, Type.BOOLEAN_TYPE);
+        cgs.doinc("computeBoolSize", Type.INT_TYPE, Type.BOOLEAN_TYPE);
         break;
 
       case Type.CHAR:
+        cgs.preinc();
         cgs.push(f.getColumnID());
         cgs.pushFieldValue();
-        cgs.inc("computeUInt32Size", Type.INT_TYPE, Type.INT_TYPE);
+        cgs.doinc("computeUInt32Size", Type.INT_TYPE, Type.INT_TYPE);
         break;
 
       case Type.BYTE:
       case Type.SHORT:
       case Type.INT:
+        cgs.preinc();
         cgs.push(f.getColumnID());
         cgs.pushFieldValue();
-        cgs.inc("computeSInt32Size", Type.INT_TYPE, Type.INT_TYPE);
+        cgs.doinc("computeSInt32Size", Type.INT_TYPE, Type.INT_TYPE);
         break;
 
       case Type.FLOAT:
+        cgs.preinc();
         cgs.push(f.getColumnID());
         cgs.pushFieldValue();
-        cgs.inc("computeFloatSize", Type.INT_TYPE, Type.FLOAT_TYPE);
+        cgs.doinc("computeFloatSize", Type.INT_TYPE, Type.FLOAT_TYPE);
         break;
 
       case Type.DOUBLE:
+        cgs.preinc();
         cgs.push(f.getColumnID());
         cgs.pushFieldValue();
-        cgs.inc("computeDoubleSize", Type.INT_TYPE, Type.DOUBLE_TYPE);
+        cgs.doinc("computeDoubleSize", Type.INT_TYPE, Type.DOUBLE_TYPE);
         break;
 
       case Type.LONG:
+        cgs.preinc();
         cgs.push(f.getColumnID());
         cgs.pushFieldValue();
-        cgs.inc("computeSInt64", Type.INT_TYPE, Type.LONG_TYPE);
+        cgs.doinc("computeSInt64", Type.INT_TYPE, Type.LONG_TYPE);
         break;
 
       case Type.ARRAY:
@@ -252,31 +260,36 @@
         mv.visitJumpInsn(IFNULL, end);
 
         if (f.getPrimitiveType() == byte[].class) {
+          cgs.preinc();
           cgs.push(f.getColumnID());
-          cgs.inc("computeTagSize", Type.INT_TYPE);
+          cgs.doinc("computeTagSize", Type.INT_TYPE);
 
+          cgs.preinc();
           cgs.pushFieldValue();
           mv.visitInsn(ARRAYLENGTH);
-          cgs.inc("computeRawVarint32Size", Type.INT_TYPE);
+          cgs.doinc("computeRawVarint32Size", Type.INT_TYPE);
 
+          cgs.preinc();
           cgs.pushFieldValue();
           mv.visitInsn(ARRAYLENGTH);
-          cgs.inc();
+          cgs.doinc();
 
         } else if (f.getPrimitiveType() == String.class) {
+          cgs.preinc();
           cgs.push(f.getColumnID());
           cgs.pushFieldValue();
-          cgs.inc("computeStringSize", Type.INT_TYPE, string);
+          cgs.doinc("computeStringSize", Type.INT_TYPE, string);
 
         } else if (f.getPrimitiveType() == java.sql.Timestamp.class
             || f.getPrimitiveType() == java.util.Date.class
             || f.getPrimitiveType() == java.sql.Date.class) {
+          cgs.preinc();
           cgs.push(f.getColumnID());
           cgs.pushFieldValue();
           String tsType = Type.getType(f.getPrimitiveType()).getInternalName();
           mv.visitMethodInsn(INVOKEVIRTUAL, tsType, "getTime", Type
               .getMethodDescriptor(Type.LONG_TYPE, new Type[] {}));
-          cgs.inc("computeFixed64Size", Type.INT_TYPE, Type.LONG_TYPE);
+          cgs.doinc("computeFixed64Size", Type.INT_TYPE, Type.LONG_TYPE);
 
         } else {
           throw new OrmException("Type " + f.getPrimitiveType()
@@ -295,7 +308,8 @@
   private void implementEncode() throws OrmException {
     final MethodVisitor mv =
         cw.visitMethod(ACC_PUBLIC, "encode", Type.getMethodDescriptor(
-            byteString, new Type[] {object}), null, new String[] {});
+            Type.VOID_TYPE, new Type[] {object, codedOutputStream}), null,
+            new String[] {});
     mv.visitCode();
     final EncodeCGS cgs = new EncodeCGS(mv);
     cgs.setEntityType(pojoType);
@@ -306,27 +320,17 @@
 
     encodeMessage(myFields, mv, cgs);
 
-    mv.visitInsn(ARETURN);
+    cgs.pushCodedOutputStream();
+    mv.visitMethodInsn(INVOKEVIRTUAL, codedOutputStream.getInternalName(),
+        "flush", Type.getMethodDescriptor(Type.VOID_TYPE, new Type[] {}));
+
+    mv.visitInsn(RETURN);
     mv.visitMaxs(-1, -1);
     mv.visitEnd();
   }
 
   private static void encodeMessage(final JavaColumnModel[] myFields,
       final MethodVisitor mv, final EncodeCGS cgs) throws OrmException {
-    final int oldVar = cgs.codedOutputStreamVar;
-    cgs.codedOutputStreamVar = cgs.newLocal();
-
-    final int strVar = cgs.newLocal();
-    mv.visitMethodInsn(INVOKESTATIC, byteString.getInternalName(), "newOutput",
-        Type.getMethodDescriptor(byteStringOutput, new Type[] {}));
-    mv.visitVarInsn(ASTORE, strVar);
-
-    mv.visitVarInsn(ALOAD, strVar);
-    mv.visitMethodInsn(INVOKESTATIC, codedOutputStream.getInternalName(),
-        "newInstance", Type.getMethodDescriptor(codedOutputStream,
-            new Type[] {Type.getType(OutputStream.class)}));
-    mv.visitVarInsn(ASTORE, cgs.codedOutputStreamVar);
-
     for (final JavaColumnModel f : myFields) {
       if (f.isNested()) {
         final Label end = new Label();
@@ -334,38 +338,30 @@
         cgs.pushFieldValue();
         mv.visitJumpInsn(IFNULL, end);
 
-        final int v = cgs.newLocal();
-        encodeMessage(sort(f.getNestedColumns()), mv, cgs);
-        mv.visitVarInsn(ASTORE, v);
-
-        mv.visitVarInsn(ALOAD, v);
-        mv.visitMethodInsn(INVOKEVIRTUAL, byteString.getInternalName(), "size",
-            Type.getMethodDescriptor(Type.INT_TYPE, new Type[] {}));
-        mv.visitJumpInsn(IFEQ, end);
-
         cgs.pushCodedOutputStream();
         cgs.push(f.getColumnID());
-        mv.visitVarInsn(ALOAD, v);
-        cgs.write("writeBytes", byteString);
+        cgs.push(WireFormat.FieldType.MESSAGE.getWireType());
+        mv.visitMethodInsn(INVOKEVIRTUAL, codedOutputStream.getInternalName(),
+            "writeTag", Type.getMethodDescriptor(Type.VOID_TYPE, new Type[] {
+                Type.INT_TYPE, Type.INT_TYPE}));
 
-        cgs.freeLocal(v);
+        cgs.push(0);
+        mv.visitVarInsn(ISTORE, cgs.sizeVar);
+        sizeofMessage(sort(f.getNestedColumns()), mv, cgs);
+
+        cgs.pushCodedOutputStream();
+        mv.visitVarInsn(ILOAD, cgs.sizeVar);
+        mv.visitMethodInsn(INVOKEVIRTUAL, codedOutputStream.getInternalName(),
+            "writeRawVarint32", Type.getMethodDescriptor(Type.VOID_TYPE,
+                new Type[] {Type.INT_TYPE}));
+
+        encodeMessage(sort(f.getNestedColumns()), mv, cgs);
+
         mv.visitLabel(end);
       } else {
         encodeScalar(mv, cgs, f);
       }
     }
-
-    cgs.pushCodedOutputStream();
-    mv.visitMethodInsn(INVOKEVIRTUAL, codedOutputStream.getInternalName(),
-        "flush", Type.getMethodDescriptor(Type.VOID_TYPE, new Type[] {}));
-
-    cgs.freeLocal(cgs.codedOutputStreamVar);
-    cgs.codedOutputStreamVar = oldVar;
-
-    mv.visitVarInsn(ALOAD, strVar);
-    mv.visitMethodInsn(INVOKEVIRTUAL, byteStringOutput.getInternalName(),
-        "toByteString", Type.getMethodDescriptor(byteString, new Type[] {}));
-    cgs.freeLocal(strVar);
   }
 
   private static void encodeScalar(final MethodVisitor mv, final EncodeCGS cgs,
@@ -663,22 +659,25 @@
     cgs.fieldSetEnd();
   }
 
-  private static final class SizeofCGS extends CodeGenSupport {
+  private static class SizeofCGS extends CodeGenSupport {
     int sizeVar;
 
-    private SizeofCGS(MethodVisitor method) {
+    SizeofCGS(MethodVisitor method) {
       super(method);
       sizeVar = newLocal();
     }
 
-    void inc(String name, Type... args) {
+    void doinc(String name, Type... args) {
       mv.visitMethodInsn(INVOKESTATIC, codedOutputStream.getInternalName(),
           name, Type.getMethodDescriptor(Type.INT_TYPE, args));
-      inc();
+      doinc();
     }
 
-    void inc() {
+    void preinc() {
       mv.visitVarInsn(ILOAD, sizeVar);
+    }
+
+    void doinc() {
       mv.visitInsn(IADD);
       mv.visitVarInsn(ISTORE, sizeVar);
     }
@@ -689,15 +688,13 @@
     }
   }
 
-  private static final class EncodeCGS extends CodeGenSupport {
-    int codedOutputStreamVar;
-
+  private static final class EncodeCGS extends SizeofCGS {
     private EncodeCGS(MethodVisitor method) {
       super(method);
     }
 
     void pushCodedOutputStream() {
-      mv.visitVarInsn(ALOAD, codedOutputStreamVar);
+      mv.visitVarInsn(ALOAD, 2);
     }
 
     void write(String name, Type arg) {
@@ -705,11 +702,6 @@
           name, Type.getMethodDescriptor(Type.VOID_TYPE, new Type[] {
               Type.INT_TYPE, arg}));
     }
-
-    @Override
-    public void pushEntity() {
-      mv.visitVarInsn(ALOAD, 1);
-    }
   }
 
   private static final class DecodeCGS extends CodeGenSupport {
diff --git a/src/main/java/com/google/gwtorm/protobuf/ProtobufCodec.java b/src/main/java/com/google/gwtorm/protobuf/ProtobufCodec.java
index ccb2247..006c780 100644
--- a/src/main/java/com/google/gwtorm/protobuf/ProtobufCodec.java
+++ b/src/main/java/com/google/gwtorm/protobuf/ProtobufCodec.java
@@ -17,6 +17,7 @@
 import com.google.gwtorm.client.Column;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 
 import java.nio.ByteBuffer;
 
@@ -28,7 +29,56 @@
  */
 public abstract class ProtobufCodec<T> {
   /** Encode the object into an immutable byte string. */
-  public abstract ByteString encode(T obj);
+  public ByteString encodeToByteString(T obj) {
+    return ByteString.copyFrom(encodeToByteBuffer(obj));
+  }
+
+  /** Encode the object into an immutable byte string. */
+  public ByteBuffer encodeToByteBuffer(T obj) {
+    ByteBuffer data = ByteBuffer.allocate(sizeof(obj));
+    encode(obj, data);
+    data.flip();
+    return data;
+  }
+
+  /** Encode the object into a byte array. */
+  public byte[] encodeToByteArray(T obj) {
+    byte[] data = new byte[sizeof(obj)];
+    encode(obj, data);
+    return data;
+  }
+
+  /** Encode the object into a byte array. */
+  public void encode(T obj, final byte[] data) {
+    encode(obj, data, 0, data.length);
+  }
+
+  /** Encode the object into a byte array. */
+  public void encode(T obj, final byte[] data, int offset, int length) {
+    encode(obj, CodedOutputStream.newInstance(data, offset, length));
+  }
+
+  /** Encode the object into a ByteBuffer. */
+  public void encode(T obj, ByteBuffer buf) {
+    if (buf.hasArray()) {
+      CodedOutputStream out = CodedOutputStream.newInstance( //
+          buf.array(), //
+          buf.position(), //
+          buf.remaining());
+      encode(obj, out);
+      buf.position(buf.position() + (buf.remaining() - out.spaceLeft()));
+
+    } else {
+      encode(obj, CodedOutputStream.newInstance(newStream(buf)));
+    }
+  }
+
+  private static ByteBufferOutputStream newStream(ByteBuffer buf) {
+    return new ByteBufferOutputStream(buf);
+  }
+
+  /** Encode the object to the supplied output stream. */
+  protected abstract void encode(T obj, CodedOutputStream out);
 
   /** Compute the number of bytes of the encoded form of the object. */
   public abstract int sizeof(T obj);
@@ -39,20 +89,30 @@
   }
 
   /** Decode a byte array into an object instance. */
-  public T decode(byte[] buf) {
-    return decode(CodedInputStream.newInstance(buf));
+  public T decode(byte[] data) {
+    return decode(data, 0, data.length);
   }
 
-  /** Decode an object by reading it from the stream. */
-  protected abstract T decode(CodedInputStream in);
+  /** Decode a byte array into an object instance. */
+  public T decode(byte[] data, int offset, int length) {
+    return decode(CodedInputStream.newInstance(data, offset, length));
+  }
 
   /** Decode a byte buffer into an object instance. */
   public T decode(ByteBuffer buf) {
     if (buf.hasArray()) {
-      return decode(CodedInputStream.newInstance(buf.array(), buf.position(),
-          buf.remaining()));
+      CodedInputStream in = CodedInputStream.newInstance( //
+          buf.array(), //
+          buf.position(), //
+          buf.remaining());
+      T obj = decode(in);
+      buf.position(buf.position() + in.getTotalBytesRead());
+      return obj;
     } else {
       return decode(ByteString.copyFrom(buf));
     }
   }
+
+  /** Decode an object by reading it from the stream. */
+  protected abstract T decode(CodedInputStream in);
 }
diff --git a/src/test/java/com/google/gwtorm/protobuf/ProtobufEncoderTest.java b/src/test/java/com/google/gwtorm/protobuf/ProtobufEncoderTest.java
index 15fc754..856ecd6 100644
--- a/src/test/java/com/google/gwtorm/protobuf/ProtobufEncoderTest.java
+++ b/src/test/java/com/google/gwtorm/protobuf/ProtobufEncoderTest.java
@@ -20,33 +20,36 @@
 import junit.framework.TestCase;
 
 import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
 
 public class ProtobufEncoderTest extends TestCase {
+  private static final byte[] testingBin = new byte[] {
+  //
+      // name
+      0x0a, 0x09,
+      // name.name
+      0x0a, 0x07, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67, //
+      // age
+      0x10, (byte) 0x96, 0x01, //
+      // registered (true)
+      0x18, 0x01 //
+      //
+      };
+
   @SuppressWarnings("cast")
   public void testPerson() throws UnsupportedEncodingException {
     final ProtobufCodec<TestPerson> e = CodecFactory.encoder(TestPerson.class);
-    final byte[] bin = new byte[] {
-    //
-        // name
-        0x0a, 0x09,
-        // name.name
-        0x0a, 0x07, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67, //
-        // age
-        0x10, (byte) 0x96, 0x01, //
-        // registered (true)
-        0x18, 0x01 //
-        //
-        };
-    TestPerson p = e.decode(bin);
+
+    TestPerson p = e.decode(testingBin);
     assertNotNull(p);
     assertTrue(p instanceof TestPerson);
     assertEquals("testing", p.name());
     assertEquals(75, p.age());
     assertTrue(p.isRegistered());
 
-    final byte[] out = e.encode(p).toByteArray();
-    assertEquals(new String(bin, "ISO-8859-1"), new String(out, "ISO-8859-1"));
-    assertEquals(bin.length, e.sizeof(p));
+    final byte[] out = e.encodeToByteArray(p);
+    assertEquals(asString(testingBin), asString(out));
+    assertEquals(testingBin.length, e.sizeof(p));
   }
 
   public void testAddress() {
@@ -60,11 +63,69 @@
     TestPerson p = new TestPerson(k, 42);
     TestAddress b = new TestAddress(new TestAddress.Key(k, "ny"), "ny");
 
-    byte[] act = e.encode(b).toByteArray();
+    byte[] act = e.encodeToByteArray(b);
 
     TestAddress c = e.decode(act);
     assertEquals(c.location(), b.location());
     assertEquals(c.city(), b.city());
     assertEquals(c.key(), b.key());
   }
+
+  public void testDecodeEmptiesByteBuffer() {
+    ProtobufCodec<TestPerson> e = CodecFactory.encoder(TestPerson.class);
+    ByteBuffer buf = ByteBuffer.wrap(testingBin);
+    TestPerson p = e.decode(buf);
+    assertEquals(0, buf.remaining());
+    assertEquals(testingBin.length, buf.position());
+  }
+
+  public void testEncodeFillsByteBuffer() throws UnsupportedEncodingException {
+    ProtobufCodec<TestPerson> e = CodecFactory.encoder(TestPerson.class);
+
+    TestPerson p = new TestPerson(new TestPerson.Key("testing"), 75);
+    p.register();
+
+    int sz = e.sizeof(p);
+    assertEquals(testingBin.length, sz);
+
+    ByteBuffer buf = ByteBuffer.allocate(sz);
+    e.encode(p, buf);
+    assertEquals(0, buf.remaining());
+    assertEquals(sz, buf.position());
+
+    buf.flip();
+    byte[] act = new byte[sz];
+    buf.get(act);
+
+    assertEquals(asString(testingBin), asString(act));
+  }
+
+  public void testEncodeNonArrayByteBuffer()
+      throws UnsupportedEncodingException {
+    ProtobufCodec<TestPerson> e = CodecFactory.encoder(TestPerson.class);
+
+    TestPerson p = new TestPerson(new TestPerson.Key("testing"), 75);
+    p.register();
+
+    int sz = e.sizeof(p);
+    assertEquals(testingBin.length, sz);
+
+    ByteBuffer buf = ByteBuffer.allocateDirect(sz);
+    assertFalse("direct ByteBuffer has no array", buf.hasArray());
+
+    e.encode(p, buf);
+    assertEquals(0, buf.remaining());
+    assertEquals(sz, buf.position());
+
+    buf.flip();
+    byte[] act = new byte[sz];
+    buf.get(act);
+
+    assertEquals(asString(testingBin), asString(act));
+  }
+
+  private static String asString(byte[] bin)
+      throws UnsupportedEncodingException {
+    return new String(bin, "ISO-8859-1");
+  }
 }