diff --git clients/csharp/LICENSE clients/csharp/LICENSE
index ef51da2..78f9f30 100644
--- clients/csharp/LICENSE
+++ clients/csharp/LICENSE
@@ -187,7 +187,7 @@ APPENDIX: How to apply the Apache License to your work.
same "printed page" as the copyright notice for easier
identification within third-party archives.
-Copyright [yyyy] [name of copyright owner]
+Copyright 2011 LinkedIn
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -199,4 +199,4 @@ Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
-limitations under the License.
+limitations under the License.
\ No newline at end of file
diff --git clients/csharp/src/Kafka/Kafka.Client/AbstractRequest.cs clients/csharp/src/Kafka/Kafka.Client/AbstractRequest.cs
index 0c1be48..582e9aa 100644
--- clients/csharp/src/Kafka/Kafka.Client/AbstractRequest.cs
+++ clients/csharp/src/Kafka/Kafka.Client/AbstractRequest.cs
@@ -1,4 +1,19 @@
-using System;
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
diff --git clients/csharp/src/Kafka/Kafka.Client/Kafka.Client.csproj clients/csharp/src/Kafka/Kafka.Client/Kafka.Client.csproj
index c251b13..20bb284 100644
--- clients/csharp/src/Kafka/Kafka.Client/Kafka.Client.csproj
+++ clients/csharp/src/Kafka/Kafka.Client/Kafka.Client.csproj
@@ -12,6 +12,7 @@
Kafka.Client
v4.0
512
+ 0
true
@@ -21,6 +22,32 @@
DEBUG;TRACE
prompt
4
+ False
+ False
+ True
+ False
+ False
+ False
+ False
+ False
+ False
+ False
+ False
+ True
+ False
+ False
+ False
+
+
+
+
+
+
+ False
+ Full
+ %28none%29
+ 0
+ true
pdbonly
@@ -29,41 +56,175 @@
TRACE
prompt
4
+ false
+
+
+ true
+ bin\Integration\
+ DEBUG;TRACE
+ full
+ AnyCPU
+ prompt
+ false
+ true
+ true
+ false
+ True
+ False
+ True
+ False
+ False
+ False
+ False
+ False
+ False
+ False
+ False
+ True
+ False
+ False
+ False
+
+
+
+
+
+
+ False
+ Full
+ %28none%29
+ 0
+
+ ..\..\..\lib\log4Net\log4net.dll
+
+
-
-
-
-
+
+
+ ..\..\..\lib\zookeeper\ZooKeeperNet.dll
+
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Code
+
+
+ Code
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ZooKeeperClient.cs
+
+
+
-
+
\ No newline at end of file
diff --git clients/csharp/src/Kafka/Kafka.Client/KafkaConnection.cs clients/csharp/src/Kafka/Kafka.Client/KafkaConnection.cs
index 6b9989a..845692c 100644
--- clients/csharp/src/Kafka/Kafka.Client/KafkaConnection.cs
+++ clients/csharp/src/Kafka/Kafka.Client/KafkaConnection.cs
@@ -1,19 +1,28 @@
-using System;
-using System.Net.Sockets;
-using System.Threading;
-using Kafka.Client.Request;
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
namespace Kafka.Client
{
- ///
- /// Callback made when a message request is finished being sent asynchronously.
- ///
- ///
- /// Must be of type and represents the type of message
- /// sent to Kafka.
- ///
- /// The request that was sent to the server.
- public delegate void MessageSent(RequestContext request) where T : AbstractRequest;
+ using System;
+ using System.IO;
+ using System.Net.Sockets;
+ using System.Threading;
+ using Kafka.Client.Producers.Async;
+ using Kafka.Client.Requests;
+ using Kafka.Client.Utils;
///
/// Manages connections to the Kafka.
@@ -23,7 +32,9 @@ namespace Kafka.Client
///
/// TCP client that connects to the server.
///
- private TcpClient _client;
+ private readonly TcpClient client;
+
+ private volatile bool disposed;
///
/// Initializes a new instance of the KafkaConnection class.
@@ -36,7 +47,7 @@ namespace Kafka.Client
Port = port;
// connection opened
- _client = new TcpClient(server, port);
+ client = new TcpClient(server, port);
}
///
@@ -59,6 +70,7 @@ namespace Kafka.Client
/// The data read from the server as a byte array.
public byte[] Read(int size)
{
+ this.EnsuresNotDisposed();
return Read(size, Timeout.Infinite);
}
@@ -70,123 +82,186 @@ namespace Kafka.Client
/// The data read from the server as a byte array.
public byte[] Read(int size, int readTimeout)
{
- NetworkStream stream = _client.GetStream();
+ this.EnsuresNotDisposed();
+ byte[] bytes;
+ NetworkStream stream = client.GetStream();
stream.ReadTimeout = readTimeout;
-
- byte[] bytes = new byte[size];
- bool readComplete = false;
int numberOfTries = 0;
- while (!readComplete && numberOfTries < 1000)
+ int readSize = size;
+ if (client.ReceiveBufferSize < size)
{
- if (stream.DataAvailable)
- {
- stream.Read(bytes, 0, size);
- readComplete = true;
- }
- else
+ readSize = client.ReceiveBufferSize;
+ }
+
+ using (var ms = new MemoryStream())
+ {
+ var bytesToRead = new byte[client.ReceiveBufferSize];
+
+ while (true)
{
- // wait until the server is ready to send some stuff.
- numberOfTries++;
- Thread.Sleep(10);
+ int numberOfBytesRead = stream.Read(bytesToRead, 0, readSize);
+ if (numberOfBytesRead > 0)
+ {
+ ms.Write(bytesToRead, 0, numberOfBytesRead);
+ }
+
+ if (ms.Length >= size)
+ {
+ break;
+ }
+
+ if (numberOfBytesRead == 0)
+ {
+ if (numberOfTries >= 1000)
+ {
+ break;
+ }
+
+ numberOfTries++;
+ Thread.Sleep(10);
+ }
}
- }
-
+
+ bytes = new byte[ms.Length];
+ ms.Seek(0, SeekOrigin.Begin);
+ ms.Read(bytes, 0, (int)ms.Length);
+ }
+
return bytes;
}
+
+ ///
+ /// Writes a producer request to the server asynchronously.
+ ///
+ /// The request to make.
+ public void BeginWrite(ProducerRequest request)
+ {
+ this.EnsuresNotDisposed();
+ Guard.Assert(() => request != null);
+ NetworkStream stream = client.GetStream();
+ byte[] data = request.RequestBuffer.GetBuffer();
+ stream.BeginWrite(data, 0, data.Length, asyncResult => ((NetworkStream)asyncResult.AsyncState).EndWrite(asyncResult), stream);
+ }
///
/// Writes a producer request to the server asynchronously.
///
/// The request to make.
/// The code to execute once the message is completely sent.
+ ///
+ /// Do not dispose connection till callback is invoked,
+ /// otherwise underlying network stream will be closed.
+ ///
public void BeginWrite(ProducerRequest request, MessageSent callback)
{
- NetworkStream stream = _client.GetStream();
- RequestContext ctx = new RequestContext(stream, request);
+ this.EnsuresNotDisposed();
+ Guard.Assert(() => request != null);
+ if (callback == null)
+ {
+ this.BeginWrite(request);
+ return;
+ }
- byte[] data = request.GetBytes();
+ NetworkStream stream = client.GetStream();
+ var ctx = new RequestContext(stream, request);
+
+ byte[] data = request.RequestBuffer.GetBuffer();
stream.BeginWrite(
- data,
- 0,
- data.Length,
+ data,
+ 0,
+ data.Length,
delegate(IAsyncResult asyncResult)
- {
- RequestContext context = (RequestContext)asyncResult.AsyncState;
-
- if (callback != null)
{
+ var context = (RequestContext)asyncResult.AsyncState;
callback(context);
- }
-
- context.NetworkStream.EndWrite(asyncResult);
- context.NetworkStream.Dispose();
- },
+ context.NetworkStream.EndWrite(asyncResult);
+ },
ctx);
}
///
- /// Writes a producer request to the server asynchronously.
+ /// Writes a producer request to the server.
///
///
- /// The default callback simply calls the . This is
- /// basically a low level fire and forget call.
+ /// Write timeout is defaulted to infitite.
///
- /// The data to send to the server.
- public void BeginWrite(byte[] data)
+ /// The to send to the server.
+ public void Write(ProducerRequest request)
{
- NetworkStream stream = _client.GetStream();
- stream.BeginWrite(data, 0, data.Length, (asyncResult) => ((NetworkStream)asyncResult.AsyncState).EndWrite(asyncResult), stream);
+ this.EnsuresNotDisposed();
+ Guard.Assert(() => request != null);
+ this.Write(request.RequestBuffer.GetBuffer(), Timeout.Infinite);
}
///
- /// Writes data to the server.
+ /// Writes a multi-producer request to the server.
///
///
- /// Write timeout is defaulted to infinite.
+ /// Write timeout is defaulted to infitite.
///
+ /// The to send to the server.
+ public void Write(MultiProducerRequest request)
+ {
+ this.EnsuresNotDisposed();
+ Guard.Assert(() => request != null);
+ this.Write(request.RequestBuffer.GetBuffer(), Timeout.Infinite);
+ }
+
+ ///
+ /// Writes data to the server.
+ ///
/// The data to write to the server.
- public void Write(byte[] data)
+ /// The amount of time that a write operation blocks waiting for data.
+ private void Write(byte[] data, int writeTimeout)
{
- Write(data, Timeout.Infinite);
+ NetworkStream stream = this.client.GetStream();
+ stream.WriteTimeout = writeTimeout;
+
+ // Send the message to the connected TcpServer.
+ stream.Write(data, 0, data.Length);
}
///
- /// Writes a producer request to the server.
+ /// Writes a fetch request to the server.
///
///
/// Write timeout is defaulted to infitite.
///
- /// The to send to the server.
- public void Write(ProducerRequest request)
+ /// The to send to the server.
+ public void Write(FetchRequest request)
{
- Write(request.GetBytes());
+ this.EnsuresNotDisposed();
+ Guard.Assert(() => request != null);
+ this.Write(request.RequestBuffer.GetBuffer(), Timeout.Infinite);
}
///
- /// Writes a multi-producer request to the server.
+ /// Writes a multifetch request to the server.
///
///
/// Write timeout is defaulted to infitite.
///
- /// The to send to the server.
- public void Write(MultiProducerRequest request)
+ /// The to send to the server.
+ public void Write(MultiFetchRequest request)
{
- Write(request.GetBytes());
+ this.EnsuresNotDisposed();
+ Guard.Assert(() => request != null);
+ this.Write(request.RequestBuffer.GetBuffer(), Timeout.Infinite);
}
///
- /// Writes data to the server.
+ /// Writes a offset request to the server.
///
- /// The data to write to the server.
- /// The amount of time that a write operation blocks waiting for data.
- public void Write(byte[] data, int writeTimeout)
+ ///
+ /// Write timeout is defaulted to infitite.
+ ///
+ /// The to send to the server.
+ public void Write(OffsetRequest request)
{
- NetworkStream stream = _client.GetStream();
- stream.WriteTimeout = writeTimeout;
-
- // Send the message to the connected TcpServer.
- stream.Write(data, 0, data.Length);
+ this.EnsuresNotDisposed();
+ Guard.Assert(() => request != null);
+ this.Write(request.RequestBuffer.GetBuffer(), Timeout.Infinite);
}
///
@@ -194,10 +269,27 @@ namespace Kafka.Client
///
public void Dispose()
{
- if (_client != null)
+ if (this.disposed)
+ {
+ return;
+ }
+
+ this.disposed = true;
+ if (this.client != null)
+ {
+ this.client.GetStream().Close();
+ this.client.Close();
+ }
+ }
+
+ ///
+ /// Ensures that object was not disposed
+ ///
+ private void EnsuresNotDisposed()
+ {
+ if (this.disposed)
{
- _client.GetStream().Close();
- _client.Close();
+ throw new ObjectDisposedException(this.GetType().Name);
}
}
}
diff --git clients/csharp/src/Kafka/Kafka.Client/KafkaException.cs clients/csharp/src/Kafka/Kafka.Client/KafkaException.cs
index 5d84031..29ebe00 100644
--- clients/csharp/src/Kafka/Kafka.Client/KafkaException.cs
+++ clients/csharp/src/Kafka/Kafka.Client/KafkaException.cs
@@ -1,4 +1,20 @@
-using System;
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
diff --git clients/csharp/src/Kafka/Kafka.Client/Message.cs clients/csharp/src/Kafka/Kafka.Client/Message.cs
index 6d04192..ad8b087 100644
--- clients/csharp/src/Kafka/Kafka.Client/Message.cs
+++ clients/csharp/src/Kafka/Kafka.Client/Message.cs
@@ -1,4 +1,20 @@
-using System;
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
using System.Linq;
using System.Text;
using Kafka.Client.Util;
diff --git clients/csharp/src/Kafka/Kafka.Client/Producer.cs clients/csharp/src/Kafka/Kafka.Client/Producer.cs
index e763f76..a7c7e33 100644
--- clients/csharp/src/Kafka/Kafka.Client/Producer.cs
+++ clients/csharp/src/Kafka/Kafka.Client/Producer.cs
@@ -1,4 +1,20 @@
-using System;
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
using System.Collections.Generic;
using System.Text;
using Kafka.Client.Request;
diff --git clients/csharp/src/Kafka/Kafka.Client/Properties/AssemblyInfo.cs clients/csharp/src/Kafka/Kafka.Client/Properties/AssemblyInfo.cs
index e799765..6a9cf37 100644
--- clients/csharp/src/Kafka/Kafka.Client/Properties/AssemblyInfo.cs
+++ clients/csharp/src/Kafka/Kafka.Client/Properties/AssemblyInfo.cs
@@ -1,36 +1,34 @@
-using System.Reflection;
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
-// General Information about an assembly is controlled through the following
-// set of attributes. Change these attribute values to modify the information
-// associated with an assembly.
[assembly: AssemblyTitle("Kafka.Client")]
-[assembly: AssemblyDescription("")]
-[assembly: AssemblyConfiguration("")]
-[assembly: AssemblyCompany("Microsoft")]
+[assembly: AssemblyDescription(".NET Client for Kafka")]
+[assembly: AssemblyCompany("ExactTarget")]
[assembly: AssemblyProduct("Kafka.Client")]
-[assembly: AssemblyCopyright("Copyright © Microsoft 2011")]
-[assembly: AssemblyTrademark("")]
-[assembly: AssemblyCulture("")]
+[assembly: AssemblyCopyright("Copyright © ExactTarget 2011")]
-// Setting ComVisible to false makes the types in this assembly not visible
-// to COM components. If you need to access a type in this assembly from
-// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]
-
-// The following GUID is for the ID of the typelib if this project is exposed to COM
-[assembly: Guid("93d702e5-9998-49a8-8c16-5b04b3ba55c1")]
-
-// Version information for an assembly consists of the following four values:
-//
-// Major Version
-// Minor Version
-// Build Number
-// Revision
-//
-// You can specify all the values or you can default the Build and Revision Numbers
-// by using the '*' as shown below:
-// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.0.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]
+[assembly: InternalsVisibleTo("Kafka.Client.Tests")]
+[assembly: InternalsVisibleTo("Kafka.Client.IntegrationTests")]
+[assembly: CLSCompliant(true)]
+
diff --git clients/csharp/src/Kafka/Kafka.Client/Request/FetchRequest.cs clients/csharp/src/Kafka/Kafka.Client/Request/FetchRequest.cs
index 5994f8f..20b67b4 100644
--- clients/csharp/src/Kafka/Kafka.Client/Request/FetchRequest.cs
+++ clients/csharp/src/Kafka/Kafka.Client/Request/FetchRequest.cs
@@ -1,4 +1,20 @@
-using System;
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
diff --git clients/csharp/src/Kafka/Kafka.Client/Request/MultiFetchRequest.cs clients/csharp/src/Kafka/Kafka.Client/Request/MultiFetchRequest.cs
index ff45c96..ca6d018 100644
--- clients/csharp/src/Kafka/Kafka.Client/Request/MultiFetchRequest.cs
+++ clients/csharp/src/Kafka/Kafka.Client/Request/MultiFetchRequest.cs
@@ -1,4 +1,20 @@
-using System;
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
diff --git clients/csharp/src/Kafka/Kafka.Client/Request/MultiProducerRequest.cs clients/csharp/src/Kafka/Kafka.Client/Request/MultiProducerRequest.cs
index 0cd8dda..1d25162 100644
--- clients/csharp/src/Kafka/Kafka.Client/Request/MultiProducerRequest.cs
+++ clients/csharp/src/Kafka/Kafka.Client/Request/MultiProducerRequest.cs
@@ -1,4 +1,20 @@
-using System;
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
diff --git clients/csharp/src/Kafka/Kafka.Client/Request/OffsetRequest.cs clients/csharp/src/Kafka/Kafka.Client/Request/OffsetRequest.cs
index 539b16a..85bd00c 100644
--- clients/csharp/src/Kafka/Kafka.Client/Request/OffsetRequest.cs
+++ clients/csharp/src/Kafka/Kafka.Client/Request/OffsetRequest.cs
@@ -1,4 +1,20 @@
-using System;
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
diff --git clients/csharp/src/Kafka/Kafka.Client/Request/ProducerRequest.cs clients/csharp/src/Kafka/Kafka.Client/Request/ProducerRequest.cs
index 4ed4a1c..4ea9eab 100644
--- clients/csharp/src/Kafka/Kafka.Client/Request/ProducerRequest.cs
+++ clients/csharp/src/Kafka/Kafka.Client/Request/ProducerRequest.cs
@@ -1,4 +1,20 @@
-using System;
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
using System.Collections.Generic;
using System.Text;
using Kafka.Client.Util;
diff --git clients/csharp/src/Kafka/Kafka.Client/RequestContext.cs clients/csharp/src/Kafka/Kafka.Client/RequestContext.cs
index 43c0a17..64ef2b2 100644
--- clients/csharp/src/Kafka/Kafka.Client/RequestContext.cs
+++ clients/csharp/src/Kafka/Kafka.Client/RequestContext.cs
@@ -1,7 +1,24 @@
-using System.Net.Sockets;
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
namespace Kafka.Client
{
+ using System.Net.Sockets;
+ using Kafka.Client.Requests;
+
///
/// The context of a request made to Kafka.
///
diff --git clients/csharp/src/Kafka/Kafka.Client/RequestType.cs clients/csharp/src/Kafka/Kafka.Client/RequestType.cs
index 948ae37..eacfa46 100644
--- clients/csharp/src/Kafka/Kafka.Client/RequestType.cs
+++ clients/csharp/src/Kafka/Kafka.Client/RequestType.cs
@@ -1,4 +1,20 @@
-namespace Kafka.Client
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Kafka.Client
{
///
/// Requests types for Kafka
diff --git clients/csharp/src/Kafka/Kafka.Client/Util/BitWorks.cs clients/csharp/src/Kafka/Kafka.Client/Util/BitWorks.cs
index b0fe243..c52637a 100644
--- clients/csharp/src/Kafka/Kafka.Client/Util/BitWorks.cs
+++ clients/csharp/src/Kafka/Kafka.Client/Util/BitWorks.cs
@@ -1,4 +1,20 @@
-using System;
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
diff --git clients/csharp/src/Kafka/Kafka.Client/Util/Crc32.cs clients/csharp/src/Kafka/Kafka.Client/Util/Crc32.cs
index 075df71..4ea49d8 100644
--- clients/csharp/src/Kafka/Kafka.Client/Util/Crc32.cs
+++ clients/csharp/src/Kafka/Kafka.Client/Util/Crc32.cs
@@ -1,4 +1,20 @@
-//
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//
using System;
using System.Security.Cryptography;
diff --git clients/csharp/src/Kafka/Kafka.sln clients/csharp/src/Kafka/Kafka.sln
index e6da1a5..88a08bb 100644
--- clients/csharp/src/Kafka/Kafka.sln
+++ clients/csharp/src/Kafka/Kafka.sln
@@ -12,19 +12,26 @@ EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
+ Integration|Any CPU = Integration|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Integration|Any CPU.ActiveCfg = Integration|Any CPU
+ {A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Integration|Any CPU.Build.0 = Integration|Any CPU
{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Release|Any CPU.Build.0 = Release|Any CPU
{9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Integration|Any CPU.ActiveCfg = Integration|Any CPU
+ {9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Integration|Any CPU.Build.0 = Integration|Any CPU
{9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Release|Any CPU.Build.0 = Release|Any CPU
{AF29C330-49BD-4648-B692-882E922C435B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{AF29C330-49BD-4648-B692-882E922C435B}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {AF29C330-49BD-4648-B692-882E922C435B}.Integration|Any CPU.ActiveCfg = Integration|Any CPU
+ {AF29C330-49BD-4648-B692-882E922C435B}.Integration|Any CPU.Build.0 = Integration|Any CPU
{AF29C330-49BD-4648-B692-882E922C435B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AF29C330-49BD-4648-B692-882E922C435B}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
diff --git clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Kafka.Client.IntegrationTests.csproj clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Kafka.Client.IntegrationTests.csproj
index e6cdc3c..e8c6204 100644
--- clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Kafka.Client.IntegrationTests.csproj
+++ clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Kafka.Client.IntegrationTests.csproj
@@ -12,6 +12,7 @@
Kafka.Client.IntegrationTests
v4.0
512
+ 0
true
@@ -21,6 +22,32 @@
DEBUG;TRACE
prompt
4
+ true
+ False
+ False
+ True
+ False
+ False
+ False
+ False
+ False
+ False
+ False
+ False
+ True
+ False
+ False
+ False
+
+
+
+
+
+
+ False
+ Full
+ %28none%29
+ 0
pdbonly
@@ -29,23 +56,52 @@
TRACE
prompt
4
+ false
+
+
+ true
+ full
+ false
+ bin\Integration\
+ DEBUG;TRACE
+ prompt
+ 4
+ false
+
+
+
+
+ ..\..\..\..\lib\log4Net\log4net.dll
+
False
..\..\..\..\lib\nunit\2.5.9\nunit.framework.dll
+
-
-
-
-
+
+ ..\..\..\..\lib\zookeeper\ZooKeeperNet.dll
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -53,12 +109,29 @@
Kafka.Client
+
+
+ Designer
+
+
+ Designer
+
+
+ Always
+
+
+ Designer
+
+
-
\ No newline at end of file
diff --git clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/KafkaIntegrationTest.cs clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/KafkaIntegrationTest.cs
index 2e8ab5c..c19a514 100644
--- clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/KafkaIntegrationTest.cs
+++ clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/KafkaIntegrationTest.cs
@@ -1,28 +1,55 @@
-using System;
-using System.Collections.Generic;
-using System.Text;
-using System.Threading;
-using Kafka.Client.Request;
-using NUnit.Framework;
-
-namespace Kafka.Client.Tests
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.IntegrationTests
{
+ using System;
+ using System.Collections.Generic;
+ using System.Linq;
+ using System.Text;
+ using System.Threading;
+ using Kafka.Client.Cfg;
+ using Kafka.Client.Consumers;
+ using Kafka.Client.Messages;
+ using Kafka.Client.Producers.Async;
+ using Kafka.Client.Producers.Sync;
+ using Kafka.Client.Requests;
+ using NUnit.Framework;
+
///
/// Contains tests that go all the way to Kafka and back.
///
[TestFixture]
- [Ignore("Requires a Kafka server running to execute")]
- public class KafkaIntegrationTest
+ public class KafkaIntegrationTest : IntegrationFixtureBase
{
///
- /// Kafka server to test against.
+ /// Kafka Client configuration
///
- private static readonly string KafkaServer = "192.168.50.203";
+ private static KafkaClientConfiguration clientConfig;
///
- /// Port of the Kafka server to test against.
+ /// Maximum amount of time to wait trying to get a specific test message from Kafka server (in miliseconds)
///
- private static readonly int KafkaPort = 9092;
+ private static readonly int MaxTestWaitTimeInMiliseconds = 5000;
+
+ [TestFixtureSetUp]
+ public void SetUp()
+ {
+ clientConfig = KafkaClientConfiguration.GetConfiguration();
+ }
///
/// Sends a pair of message to Kafka.
@@ -33,36 +60,131 @@ namespace Kafka.Client.Tests
string payload1 = "kafka 1.";
byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
Message msg1 = new Message(payloadData1);
-
+
string payload2 = "kafka 2.";
byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
Message msg2 = new Message(payloadData2);
- Producer producer = new Producer(KafkaServer, KafkaPort);
- producer.Send("test", 0, new List { msg1, msg2 });
+ var config = new SyncProducerConfig(clientConfig);
+ var producer = new SyncProducer(config);
+ var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List() { msg1, msg2 });
+ producer.Send(producerRequest);
+ }
+
+ ///
+ /// Sends a message with long topic to Kafka.
+ ///
+ [Test]
+ public void ProducerSendsMessageWithLongTopic()
+ {
+ Message msg = new Message(Encoding.UTF8.GetBytes("test message"));
+ string topic = "ThisIsAVeryLongTopicThisIsAVeryLongTopicThisIsAVeryLongTopicThisIsAVeryLongTopicThisIsAVeryLongTopicThisIsAVeryLongTopic";
+ var config = new SyncProducerConfig(clientConfig);
+ var producer = new SyncProducer(config);
+ var producerRequest = new ProducerRequest(topic, 0, new List() { msg });
+ producer.Send(producerRequest);
}
///
- /// Asynchronously sends a pair of message to Kafka.
+ /// Asynchronously sends many random messages to Kafka
///
[Test]
- public void ProducerSendsMessageAsynchronously()
+ public void AsyncProducerSendsManyLongRandomMessages()
+ {
+ List messages = GenerateRandomTextMessages(50);
+
+ var config = new AsyncProducerConfig(clientConfig);
+
+ var producer = new AsyncProducer(config);
+ producer.Send(CurrentTestTopic, 0, messages);
+ }
+
+ ///
+ /// Asynchronously sends few short fixed messages to Kafka
+ ///
+ [Test]
+ public void AsyncProducerSendsFewShortFixedMessages()
+ {
+ List messages = new List()
+ {
+ new Message(Encoding.UTF8.GetBytes("Async Test Message 1")),
+ new Message(Encoding.UTF8.GetBytes("Async Test Message 2")),
+ new Message(Encoding.UTF8.GetBytes("Async Test Message 3")),
+ new Message(Encoding.UTF8.GetBytes("Async Test Message 4"))
+ };
+
+ var config = new AsyncProducerConfig(clientConfig);
+
+ var producer = new AsyncProducer(config);
+ producer.Send(CurrentTestTopic, 0, messages);
+ }
+
+ ///
+ /// Asynchronously sends few short fixed messages to Kafka in separate send actions
+ ///
+ [Test]
+ public void AsyncProducerSendsFewShortFixedMessagesInSeparateSendActions()
+ {
+ var config = new AsyncProducerConfig(clientConfig);
+ using (var producer = new AsyncProducer(config))
+ {
+ ProducerRequest req1 = new ProducerRequest(
+ CurrentTestTopic,
+ 0,
+ new List() { new Message(Encoding.UTF8.GetBytes("Async Test Message 1")) });
+ producer.Send(req1);
+
+ ProducerRequest req2 = new ProducerRequest(
+ CurrentTestTopic,
+ 0,
+ new List() { new Message(Encoding.UTF8.GetBytes("Async Test Message 2")) });
+ producer.Send(req2);
+
+ ProducerRequest req3 = new ProducerRequest(
+ CurrentTestTopic,
+ 0,
+ new List() { new Message(Encoding.UTF8.GetBytes("Async Test Message 3")) });
+ producer.Send(req3);
+ }
+ }
+
+ [Test]
+ public void AsyncProducerSendsMessageWithCallbackClass()
{
- bool waiting = true;
+ List messages = new List()
+ {
+ new Message(Encoding.UTF8.GetBytes("Async Test Message 1")),
+ };
+ var config = new AsyncProducerConfig(clientConfig);
+ TestCallbackHandler myHandler = new TestCallbackHandler();
+ var producer = new AsyncProducer(config, myHandler);
+ producer.Send(CurrentTestTopic, 0, messages);
+ Thread.Sleep(1000);
+ Assert.IsTrue(myHandler.WasRun);
+ }
- List messages = GenerateRandomMessages(50);
+ [Test]
+ public void AsyncProducerSendsMessageWithCallback()
+ {
+ List messages = new List()
+ {
+ new Message(Encoding.UTF8.GetBytes("Async Test Message 1")),
+ };
+ var config = new AsyncProducerConfig(clientConfig);
+ TestCallbackHandler myHandler = new TestCallbackHandler();
+ var producer = new AsyncProducer(config);
+ producer.Send(CurrentTestTopic, 0, messages, myHandler.Handle);
+ Thread.Sleep(1000);
+ Assert.IsTrue(myHandler.WasRun);
+ }
- Producer producer = new Producer(KafkaServer, KafkaPort);
- producer.SendAsync(
- "test",
- 0,
- messages,
- (requestContext) => { waiting = false; });
+ private class TestCallbackHandler : ICallbackHandler
+ {
+ public bool WasRun { get; private set; }
- while (waiting)
+ public void Handle(RequestContext context)
{
- Console.WriteLine("Keep going...");
- Thread.Sleep(10);
+ WasRun = true;
}
}
@@ -74,15 +196,15 @@ namespace Kafka.Client.Tests
{
List requests = new List
{
- new ProducerRequest("test", 0, new List { new Message(Encoding.UTF8.GetBytes("1: " + DateTime.UtcNow)) }),
- new ProducerRequest("test", 0, new List { new Message(Encoding.UTF8.GetBytes("2: " + DateTime.UtcNow)) }),
- new ProducerRequest("testa", 0, new List { new Message(Encoding.UTF8.GetBytes("3: " + DateTime.UtcNow)) }),
- new ProducerRequest("testa", 0, new List { new Message(Encoding.UTF8.GetBytes("4: " + DateTime.UtcNow)) })
+ new ProducerRequest(CurrentTestTopic, 0, new List { new Message(Encoding.UTF8.GetBytes("1: " + DateTime.UtcNow)) }),
+ new ProducerRequest(CurrentTestTopic, 0, new List { new Message(Encoding.UTF8.GetBytes("2: " + DateTime.UtcNow)) }),
+ new ProducerRequest(CurrentTestTopic, 0, new List { new Message(Encoding.UTF8.GetBytes("3: " + DateTime.UtcNow)) }),
+ new ProducerRequest(CurrentTestTopic, 0, new List { new Message(Encoding.UTF8.GetBytes("4: " + DateTime.UtcNow)) })
};
- MultiProducerRequest request = new MultiProducerRequest(requests);
- Producer producer = new Producer(KafkaServer, KafkaPort);
- producer.Send(request);
+ var config = new SyncProducerConfig(clientConfig);
+ var producer = new SyncProducer(config);
+ producer.MultiSend(requests);
}
///
@@ -93,12 +215,14 @@ namespace Kafka.Client.Tests
{
ProducerSendsMessage();
- Consumer consumer = new Consumer(KafkaServer, KafkaPort);
- List messages = consumer.Consume("test", 0, 0);
-
- foreach (Message msg in messages)
+ ConsumerConfig config = new ConsumerConfig(clientConfig);
+ IConsumer consumer = new Kafka.Client.Consumers.Consumer(config);
+ FetchRequest request = new FetchRequest(CurrentTestTopic, 0, 0);
+ BufferedMessageSet response = consumer.Fetch(request);
+ Assert.NotNull(response);
+ foreach (var message in response.Messages)
{
- Console.WriteLine(msg);
+ Console.WriteLine(message);
}
}
@@ -110,19 +234,19 @@ namespace Kafka.Client.Tests
{
ProducerSendMultiRequest();
- Consumer consumer = new Consumer(KafkaServer, KafkaPort);
+ ConsumerConfig config = new ConsumerConfig(clientConfig);
+ IConsumer cons = new Consumers.Consumer(config);
MultiFetchRequest request = new MultiFetchRequest(new List
{
- new FetchRequest("test", 0, 0),
- new FetchRequest("test", 0, 0),
- new FetchRequest("testa", 0, 0)
+ new FetchRequest(CurrentTestTopic, 0, 0),
+ new FetchRequest(CurrentTestTopic, 0, 0),
+ new FetchRequest(CurrentTestTopic + "2", 0, 0)
});
- List> messages = consumer.Consume(request);
-
- for (int ix = 0; ix < messages.Count; ix++)
+ IList response = cons.MultiFetch(request);
+ for (int ix = 0; ix < response.Count; ix++)
{
- List messageSet = messages[ix];
+ IEnumerable messageSet = response[ix].Messages;
Console.WriteLine(string.Format("Request #{0}-->", ix));
foreach (Message msg in messageSet)
{
@@ -137,9 +261,10 @@ namespace Kafka.Client.Tests
[Test]
public void ConsumerGetsOffsets()
{
- OffsetRequest request = new OffsetRequest("test", 0, DateTime.Now.AddHours(-24).Ticks, 10);
+ OffsetRequest request = new OffsetRequest(CurrentTestTopic, 0, DateTime.Now.AddHours(-24).Ticks, 10);
- Consumer consumer = new Consumer(KafkaServer, KafkaPort);
+ ConsumerConfig config = new ConsumerConfig(clientConfig);
+ IConsumer consumer = new Consumers.Consumer(config);
IList list = consumer.GetOffsetsBefore(request);
foreach (long l in list)
@@ -149,6 +274,171 @@ namespace Kafka.Client.Tests
}
///
+ /// Synchronous Producer sends a single simple message and then a consumer consumes it
+ ///
+ [Test]
+ public void ProducerSendsAndConsumerReceivesSingleSimpleMessage()
+ {
+ Message sourceMessage = new Message(Encoding.UTF8.GetBytes("test message"));
+
+ var config = new SyncProducerConfig(clientConfig);
+ var producer = new SyncProducer(config);
+ var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List() { sourceMessage });
+
+ long currentOffset = TestHelper.GetCurrentKafkaOffset(CurrentTestTopic, clientConfig);
+
+ producer.Send(producerRequest);
+
+ ConsumerConfig consumerConfig = new ConsumerConfig(clientConfig);
+ IConsumer consumer = new Consumers.Consumer(consumerConfig);
+ FetchRequest request = new FetchRequest(CurrentTestTopic, 0, currentOffset);
+
+ BufferedMessageSet response;
+ int totalWaitTimeInMiliseconds = 0;
+ int waitSingle = 100;
+ while (true)
+ {
+ Thread.Sleep(waitSingle);
+ response = consumer.Fetch(request);
+ if (response != null && response.Messages.Count() > 0)
+ {
+ break;
+ }
+ else
+ {
+ totalWaitTimeInMiliseconds += waitSingle;
+ if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
+ {
+ break;
+ }
+ }
+ }
+
+ Assert.NotNull(response);
+ Assert.AreEqual(1, response.Messages.Count());
+ Message resultMessage = response.Messages.First();
+ Assert.AreEqual(sourceMessage.ToString(), resultMessage.ToString());
+ }
+
+ ///
+ /// Asynchronous Producer sends a single simple message and then a consumer consumes it
+ ///
+ [Test]
+ public void AsyncProducerSendsAndConsumerReceivesSingleSimpleMessage()
+ {
+ Message sourceMessage = new Message(Encoding.UTF8.GetBytes("test message"));
+
+ var config = new AsyncProducerConfig(clientConfig);
+ var producer = new AsyncProducer(config);
+ var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List() { sourceMessage });
+
+ long currentOffset = TestHelper.GetCurrentKafkaOffset(CurrentTestTopic, clientConfig);
+
+ producer.Send(producerRequest);
+
+ ConsumerConfig consumerConfig = new ConsumerConfig(clientConfig);
+ IConsumer consumer = new Consumers.Consumer(consumerConfig);
+ FetchRequest request = new FetchRequest(CurrentTestTopic, 0, currentOffset);
+
+ BufferedMessageSet response;
+ int totalWaitTimeInMiliseconds = 0;
+ int waitSingle = 100;
+ while (true)
+ {
+ Thread.Sleep(waitSingle);
+ response = consumer.Fetch(request);
+ if (response != null && response.Messages.Count() > 0)
+ {
+ break;
+ }
+ else
+ {
+ totalWaitTimeInMiliseconds += waitSingle;
+ if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
+ {
+ break;
+ }
+ }
+ }
+
+ Assert.NotNull(response);
+ Assert.AreEqual(1, response.Messages.Count());
+ Message resultMessage = response.Messages.First();
+ Assert.AreEqual(sourceMessage.ToString(), resultMessage.ToString());
+ }
+
+ ///
+ /// Synchronous producer sends a multi request and a consumer receives it from to Kafka.
+ ///
+ [Test]
+ public void ProducerSendsAndConsumerReceivesMultiRequest()
+ {
+ string testTopic1 = CurrentTestTopic + "1";
+ string testTopic2 = CurrentTestTopic + "2";
+ string testTopic3 = CurrentTestTopic + "3";
+
+ Message sourceMessage1 = new Message(Encoding.UTF8.GetBytes("1: TestMessage"));
+ Message sourceMessage2 = new Message(Encoding.UTF8.GetBytes("2: TestMessage"));
+ Message sourceMessage3 = new Message(Encoding.UTF8.GetBytes("3: TestMessage"));
+ Message sourceMessage4 = new Message(Encoding.UTF8.GetBytes("4: TestMessage"));
+
+ List requests = new List
+ {
+ new ProducerRequest(testTopic1, 0, new List { sourceMessage1 }),
+ new ProducerRequest(testTopic1, 0, new List { sourceMessage2 }),
+ new ProducerRequest(testTopic2, 0, new List { sourceMessage3 }),
+ new ProducerRequest(testTopic3, 0, new List { sourceMessage4 })
+ };
+
+ var config = new SyncProducerConfig(clientConfig);
+ var producer = new SyncProducer(config);
+
+ long currentOffset1 = TestHelper.GetCurrentKafkaOffset(testTopic1, clientConfig);
+ long currentOffset2 = TestHelper.GetCurrentKafkaOffset(testTopic2, clientConfig);
+ long currentOffset3 = TestHelper.GetCurrentKafkaOffset(testTopic3, clientConfig);
+
+ producer.MultiSend(requests);
+
+ ConsumerConfig consumerConfig = new ConsumerConfig(clientConfig);
+ IConsumer consumer = new Consumers.Consumer(consumerConfig);
+ MultiFetchRequest request = new MultiFetchRequest(new List
+ {
+ new FetchRequest(testTopic1, 0, currentOffset1),
+ new FetchRequest(testTopic2, 0, currentOffset2),
+ new FetchRequest(testTopic3, 0, currentOffset3)
+ });
+ IList messageSets;
+ int totalWaitTimeInMiliseconds = 0;
+ int waitSingle = 100;
+ while (true)
+ {
+ Thread.Sleep(waitSingle);
+ messageSets = consumer.MultiFetch(request);
+ if (messageSets.Count > 2 && messageSets[0].Messages.Count() > 0 && messageSets[1].Messages.Count() > 0 && messageSets[2].Messages.Count() > 0)
+ {
+ break;
+ }
+ else
+ {
+ totalWaitTimeInMiliseconds += waitSingle;
+ if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
+ {
+ break;
+ }
+ }
+ }
+
+ Assert.AreEqual(3, messageSets.Count);
+ Assert.AreEqual(2, messageSets[0].Messages.Count());
+ Assert.AreEqual(1, messageSets[1].Messages.Count());
+ Assert.AreEqual(1, messageSets[2].Messages.Count());
+ Assert.AreEqual(sourceMessage1.ToString(), messageSets[0].Messages.First().ToString());
+ Assert.AreEqual(sourceMessage2.ToString(), messageSets[0].Messages.Skip(1).First().ToString());
+ Assert.AreEqual(sourceMessage3.ToString(), messageSets[1].Messages.First().ToString());
+ Assert.AreEqual(sourceMessage4.ToString(), messageSets[2].Messages.First().ToString());
+ }
+
+ ///
/// Gererates a randome list of messages.
///
/// The number of messages to generate.
@@ -177,5 +467,42 @@ namespace Kafka.Client.Tests
return randBytes;
}
+
+ ///
+ /// Gererates a randome list of text messages.
+ ///
+ /// The number of messages to generate.
+ /// A list of random text messages.
+ private static List GenerateRandomTextMessages(int numberOfMessages)
+ {
+ List messages = new List();
+ for (int ix = 0; ix < numberOfMessages; ix++)
+ {
+ ////messages.Add(new Message(GenerateRandomBytes(10000)));
+ messages.Add(new Message(Encoding.UTF8.GetBytes(GenerateRandomMessage(10000))));
+ }
+
+ return messages;
+ }
+
+ ///
+ /// Generate a random message text.
+ ///
+ /// Length of the message string.
+ /// Random message string.
+ private static string GenerateRandomMessage(int length)
+ {
+ StringBuilder builder = new StringBuilder();
+ Random random = new Random();
+ char ch;
+ for (int i = 0; i < length; i++)
+ {
+ ch = Convert.ToChar(Convert.ToInt32(
+ Math.Floor((26 * random.NextDouble()) + 65)));
+ builder.Append(ch);
+ }
+
+ return builder.ToString();
+ }
}
}
diff --git clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Properties/AssemblyInfo.cs clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Properties/AssemblyInfo.cs
index 9d6e9b7..b72cec6 100644
--- clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Properties/AssemblyInfo.cs
+++ clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Properties/AssemblyInfo.cs
@@ -1,5 +1,20 @@
-using System.Reflection;
-using System.Runtime.CompilerServices;
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Reflection;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
diff --git clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Kafka.Client.Tests.csproj clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Kafka.Client.Tests.csproj
index e23dd3d..2b0511d 100644
--- clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Kafka.Client.Tests.csproj
+++ clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Kafka.Client.Tests.csproj
@@ -12,6 +12,7 @@
Kafka.Client.Tests
v4.0
512
+ 0
true
@@ -21,6 +22,32 @@
DEBUG;TRACE
prompt
4
+ true
+ False
+ False
+ True
+ False
+ False
+ False
+ False
+ False
+ False
+ False
+ False
+ True
+ False
+ False
+ False
+
+
+
+
+
+
+ False
+ Full
+ %28none%29
+ 0
pdbonly
@@ -29,22 +56,36 @@
TRACE
prompt
4
+ false
+
+
+ true
+ bin\Integration\
+ DEBUG;TRACE
+ full
+ AnyCPU
+ prompt
+ true
+ true
+ true
+ false
+
+ ..\..\..\..\lib\log4Net\log4net.dll
+
False
..\..\..\..\lib\nunit\2.5.9\nunit.framework.dll
-
-
-
-
+
+
@@ -59,12 +100,9 @@
Kafka.Client
+
+
+
-
+
\ No newline at end of file
diff --git clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageTests.cs clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageTests.cs
index 3715fdc..5c5d95b 100644
--- clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageTests.cs
+++ clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageTests.cs
@@ -1,24 +1,48 @@
-using System;
-using System.Linq;
-using System.Text;
-using Kafka.Client.Util;
-using NUnit.Framework;
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
namespace Kafka.Client.Tests
{
+ using System;
+ using System.IO;
+ using System.Linq;
+ using System.Text;
+ using Kafka.Client.Messages;
+ using Kafka.Client.Utils;
+ using NUnit.Framework;
+
///
/// Tests for the class.
///
[TestFixture]
public class MessageTests
{
+ private readonly int ChecksumPartLength = 4;
+
+ private readonly int MagicNumberPartOffset = 0;
+ private readonly int ChecksumPartOffset = 1;
+ private readonly int DataPartOffset = 5;
+
///
/// Demonstrates a properly parsed message.
///
[Test]
public void ParseFromValid()
{
- Crc32 crc32 = new Crc32();
+ Crc32Hasher crc32 = new Crc32Hasher();
string payload = "kafka";
byte magic = 0;
@@ -48,21 +72,59 @@ namespace Kafka.Client.Tests
{
Message message = new Message(new byte[10], (byte)245);
- byte[] bytes = message.GetBytes();
-
- Assert.IsNotNull(bytes);
+ MemoryStream ms = new MemoryStream();
+ message.WriteTo(ms);
// len(payload) + 1 + 4
- Assert.AreEqual(15, bytes.Length);
+ Assert.AreEqual(15, ms.Length);
// first 4 bytes = the magic number
- Assert.AreEqual((byte)245, bytes[0]);
+ Assert.AreEqual((byte)245, ms.ToArray()[0]);
// next 4 bytes = the checksum
- Assert.IsTrue(message.Checksum.SequenceEqual(bytes.Skip(1).Take(4).ToArray()));
+ Assert.IsTrue(message.Checksum.SequenceEqual(ms.ToArray().Skip(1).Take(4).ToArray()));
// remaining bytes = the payload
- Assert.AreEqual(10, bytes.Skip(5).ToArray().Length);
+ Assert.AreEqual(10, ms.ToArray().Skip(5).ToArray().Length);
+ }
+
+ [Test]
+ public void WriteToValidSequenceForDefaultConstructor()
+ {
+ byte[] messageBytes = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
+ Message message = new Message(messageBytes);
+ MemoryStream ms = new MemoryStream();
+ message.WriteTo(ms);
+
+ Assert.AreEqual(0, ms.ToArray()[MagicNumberPartOffset]); // default magic number should be 0
+
+ byte[] checksumPart = new byte[ChecksumPartLength];
+ Array.Copy(ms.ToArray(), ChecksumPartOffset, checksumPart, 0, ChecksumPartLength);
+ Assert.AreEqual(Crc32Hasher.Compute(messageBytes), checksumPart);
+
+ byte[] dataPart = new byte[messageBytes.Length];
+ Array.Copy(ms.ToArray(), DataPartOffset, dataPart, 0, messageBytes.Length);
+ Assert.AreEqual(messageBytes, dataPart);
+ }
+
+ [Test]
+ public void WriteToValidSequenceForCustomConstructor()
+ {
+ byte[] messageBytes = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
+ byte[] customChecksum = new byte[] { 3, 4, 5, 6 };
+ Message message = new Message(messageBytes, (byte)33, customChecksum);
+ MemoryStream ms = new MemoryStream();
+ message.WriteTo(ms);
+
+ Assert.AreEqual((byte)33, ms.ToArray()[MagicNumberPartOffset]);
+
+ byte[] checksumPart = new byte[ChecksumPartLength];
+ Array.Copy(ms.ToArray(), ChecksumPartOffset, checksumPart, 0, ChecksumPartLength);
+ Assert.AreEqual(customChecksum, checksumPart);
+
+ byte[] dataPart = new byte[messageBytes.Length];
+ Array.Copy(ms.ToArray(), DataPartOffset, dataPart, 0, messageBytes.Length);
+ Assert.AreEqual(messageBytes, dataPart);
}
}
}
diff --git clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Properties/AssemblyInfo.cs clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Properties/AssemblyInfo.cs
index 8df5b34..d07a28f 100644
--- clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Properties/AssemblyInfo.cs
+++ clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Properties/AssemblyInfo.cs
@@ -1,5 +1,20 @@
-using System.Reflection;
-using System.Runtime.CompilerServices;
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Reflection;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
diff --git clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/FetchRequestTests.cs clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/FetchRequestTests.cs
index 7587b71..7d9387f 100644
--- clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/FetchRequestTests.cs
+++ clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/FetchRequestTests.cs
@@ -1,12 +1,29 @@
-using System;
-using System.Linq;
-using System.Text;
-using Kafka.Client.Request;
-using Kafka.Client.Util;
-using NUnit.Framework;
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
-namespace Kafka.Client.Request.Tests
+namespace Kafka.Client.Tests.Request
{
+ using System;
+ using System.IO;
+ using System.Linq;
+ using System.Text;
+ using Kafka.Client.Requests;
+ using Kafka.Client.Utils;
+ using NUnit.Framework;
+
///
/// Tests for the class.
///
@@ -14,36 +31,6 @@ namespace Kafka.Client.Request.Tests
public class FetchRequestTests
{
///
- /// Tests a valid request.
- ///
- [Test]
- public void IsValidTrue()
- {
- FetchRequest request = new FetchRequest("topic", 1, 10L, 100);
- Assert.IsTrue(request.IsValid());
- }
-
- ///
- /// Tests a invalid request with no topic.
- ///
- [Test]
- public void IsValidNoTopic()
- {
- FetchRequest request = new FetchRequest(" ", 1, 10L, 100);
- Assert.IsFalse(request.IsValid());
- }
-
- ///
- /// Tests a invalid request with no topic.
- ///
- [Test]
- public void IsValidNulltopic()
- {
- FetchRequest request = new FetchRequest(null, 1, 10L, 100);
- Assert.IsFalse(request.IsValid());
- }
-
- ///
/// Tests to ensure that the request follows the expected structure.
///
[Test]
@@ -55,7 +42,9 @@ namespace Kafka.Client.Request.Tests
// REQUEST TYPE ID + TOPIC LENGTH + TOPIC + PARTITION + OFFSET + MAX SIZE
int requestSize = 2 + 2 + topicName.Length + 4 + 8 + 4;
- byte[] bytes = request.GetBytes();
+ MemoryStream ms = new MemoryStream();
+ request.WriteTo(ms);
+ byte[] bytes = ms.ToArray();
Assert.IsNotNull(bytes);
// add 4 bytes for the length of the message at the beginning
@@ -65,7 +54,7 @@ namespace Kafka.Client.Request.Tests
Assert.AreEqual(25, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Take(4).ToArray()), 0));
// next 2 bytes = the request type
- Assert.AreEqual((short)RequestType.Fetch, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(4).Take(2).ToArray()), 0));
+ Assert.AreEqual((short)RequestTypes.Fetch, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(4).Take(2).ToArray()), 0));
// next 2 bytes = the topic length
Assert.AreEqual((short)topicName.Length, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(6).Take(2).ToArray()), 0));
diff --git clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/MultiFetchRequestTests.cs clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/MultiFetchRequestTests.cs
index 00ec7dc..2b848ed 100644
--- clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/MultiFetchRequestTests.cs
+++ clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/MultiFetchRequestTests.cs
@@ -1,12 +1,29 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using Kafka.Client.Request;
-using Kafka.Client.Util;
-using NUnit.Framework;
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
-namespace Kafka.Client.Request.Tests
+namespace Kafka.Client.Tests.Request
{
+ using System;
+ using System.Collections.Generic;
+ using System.IO;
+ using System.Linq;
+ using Kafka.Client.Requests;
+ using Kafka.Client.Utils;
+ using NUnit.Framework;
+
///
/// Tests for the class.
///
@@ -14,41 +31,13 @@ namespace Kafka.Client.Request.Tests
public class MultiFetchRequestTests
{
///
- /// Tests a valid multi-consumer request.
- ///
- [Test]
- public void IsValidTrue()
- {
- List requests = new List
- {
- new FetchRequest("topic a", 0, 0),
- new FetchRequest("topic a", 0, 0),
- new FetchRequest("topic b", 0, 0),
- new FetchRequest("topic c", 0, 0)
- };
-
- MultiFetchRequest multiRequest = new MultiFetchRequest(requests);
- Assert.IsTrue(multiRequest.IsValid());
- }
-
- ///
- /// Tests for an invalid multi-request with no requests provided.
- ///
- [Test]
- public void IsValidNoRequests()
- {
- MultiFetchRequest multiRequest = new MultiFetchRequest(new List());
- Assert.IsFalse(multiRequest.IsValid());
- }
-
- ///
- /// Tests for an invalid multi-request with no requests provided.
+ /// Tests for an invalid multi-request constructor with no requests given.
///
[Test]
- public void IsValidNullRequests()
+ public void ThrowsExceptionWhenNullArgumentPassedToTheConstructor()
{
- MultiFetchRequest multiRequest = new MultiFetchRequest(null);
- Assert.IsFalse(multiRequest.IsValid());
+ MultiFetchRequest multiRequest;
+ Assert.Throws(() => multiRequest = new MultiFetchRequest(null));
}
///
@@ -69,7 +58,9 @@ namespace Kafka.Client.Request.Tests
// format = len(request) + requesttype + requestcount + requestpackage
// total byte count = 4 + (2 + 2 + 100)
- byte[] bytes = request.GetBytes();
+ MemoryStream ms = new MemoryStream();
+ request.WriteTo(ms);
+ byte[] bytes = ms.ToArray();
Assert.IsNotNull(bytes);
Assert.AreEqual(108, bytes.Length);
@@ -77,7 +68,7 @@ namespace Kafka.Client.Request.Tests
Assert.AreEqual(104, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Take(4).ToArray()), 0));
// next 2 bytes = the RequestType which in this case should be Produce
- Assert.AreEqual((short)RequestType.MultiFetch, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(4).Take(2).ToArray()), 0));
+ Assert.AreEqual((short)RequestTypes.MultiFetch, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(4).Take(2).ToArray()), 0));
// next 2 bytes = the number of messages
Assert.AreEqual((short)4, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(6).Take(2).ToArray()), 0));
diff --git clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/MultiProducerRequestTests.cs clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/MultiProducerRequestTests.cs
index b2699a4..1a68b45 100644
--- clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/MultiProducerRequestTests.cs
+++ clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/MultiProducerRequestTests.cs
@@ -1,12 +1,30 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using Kafka.Client.Request;
-using Kafka.Client.Util;
-using NUnit.Framework;
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
-namespace Kafka.Client.Request.Tests
+namespace Kafka.Client.Tests.Request
{
+ using System;
+ using System.Collections.Generic;
+ using System.IO;
+ using System.Linq;
+ using Kafka.Client.Messages;
+ using Kafka.Client.Requests;
+ using Kafka.Client.Utils;
+ using NUnit.Framework;
+
///
/// Tests for the class.
///
@@ -14,48 +32,10 @@ namespace Kafka.Client.Request.Tests
public class MultiProducerRequestTests
{
///
- /// Tests a valid multi-producer request.
- ///
- [Test]
- public void IsValidTrue()
- {
- List requests = new List
- {
- new ProducerRequest("topic a", 0, new List { new Message(new byte[10]) }),
- new ProducerRequest("topic a", 0, new List { new Message(new byte[10]) }),
- new ProducerRequest("topic b", 0, new List { new Message(new byte[10]) }),
- new ProducerRequest("topic c", 0, new List { new Message(new byte[10]) })
- };
-
- MultiProducerRequest multiRequest = new MultiProducerRequest(requests);
- Assert.IsTrue(multiRequest.IsValid());
- }
-
- ///
- /// Tests for an invalid multi-request with no requests provided.
- ///
- [Test]
- public void IsValidNoRequests()
- {
- MultiProducerRequest multiRequest = new MultiProducerRequest(new List());
- Assert.IsFalse(multiRequest.IsValid());
- }
-
- ///
- /// Tests for an invalid multi-request with no requests provided.
- ///
- [Test]
- public void IsValidNullRequests()
- {
- MultiProducerRequest multiRequest = new MultiProducerRequest(null);
- Assert.IsFalse(multiRequest.IsValid());
- }
-
- ///
/// Test to ensure a valid format in the returned byte array as expected by Kafka.
///
[Test]
- public void GetBytesValidFormat()
+ public void WriteToValidFormat()
{
List requests = new List
{
@@ -69,7 +49,9 @@ namespace Kafka.Client.Request.Tests
// format = len(request) + requesttype + requestcount + requestpackage
// total byte count = 4 + (2 + 2 + 144)
- byte[] bytes = request.GetBytes();
+ MemoryStream ms = new MemoryStream();
+ request.WriteTo(ms);
+ byte[] bytes = ms.ToArray();
Assert.IsNotNull(bytes);
Assert.AreEqual(152, bytes.Length);
@@ -77,7 +59,7 @@ namespace Kafka.Client.Request.Tests
Assert.AreEqual(148, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Take(4).ToArray()), 0));
// next 2 bytes = the RequestType which in this case should be Produce
- Assert.AreEqual((short)RequestType.MultiProduce, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(4).Take(2).ToArray()), 0));
+ Assert.AreEqual((short)RequestTypes.MultiProduce, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(4).Take(2).ToArray()), 0));
// next 2 bytes = the number of messages
Assert.AreEqual((short)4, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(6).Take(2).ToArray()), 0));
diff --git clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/OffsetRequestTests.cs clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/OffsetRequestTests.cs
index 4981635..0bba20c 100644
--- clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/OffsetRequestTests.cs
+++ clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/OffsetRequestTests.cs
@@ -1,12 +1,29 @@
-using System;
-using System.Linq;
-using System.Text;
-using Kafka.Client.Request;
-using Kafka.Client.Util;
-using NUnit.Framework;
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
-namespace Kafka.Client.Request.Tests
+namespace Kafka.Client.Tests.Request
{
+ using System;
+ using System.IO;
+ using System.Linq;
+ using System.Text;
+ using Kafka.Client.Requests;
+ using Kafka.Client.Utils;
+ using NUnit.Framework;
+
///
/// Tests the class.
///
@@ -14,36 +31,6 @@ namespace Kafka.Client.Request.Tests
public class OffsetRequestTests
{
///
- /// Tests a valid request.
- ///
- [Test]
- public void IsValidTrue()
- {
- FetchRequest request = new FetchRequest("topic", 1, 10L, 100);
- Assert.IsTrue(request.IsValid());
- }
-
- ///
- /// Tests a invalid request with no topic.
- ///
- [Test]
- public void IsValidNoTopic()
- {
- FetchRequest request = new FetchRequest(" ", 1, 10L, 100);
- Assert.IsFalse(request.IsValid());
- }
-
- ///
- /// Tests a invalid request with no topic.
- ///
- [Test]
- public void IsValidNulltopic()
- {
- FetchRequest request = new FetchRequest(null, 1, 10L, 100);
- Assert.IsFalse(request.IsValid());
- }
-
- ///
/// Validates the list of bytes meet Kafka expectations.
///
[Test]
@@ -54,7 +41,9 @@ namespace Kafka.Client.Request.Tests
// format = len(request) + requesttype + len(topic) + topic + partition + time + max
// total byte count = 4 + (2 + 2 + 5 + 4 + 8 + 4)
- byte[] bytes = request.GetBytes();
+ MemoryStream ms = new MemoryStream();
+ request.WriteTo(ms);
+ byte[] bytes = ms.ToArray();
Assert.IsNotNull(bytes);
Assert.AreEqual(29, bytes.Length);
@@ -62,7 +51,7 @@ namespace Kafka.Client.Request.Tests
Assert.AreEqual(25, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Take(4).ToArray()), 0));
// next 2 bytes = the RequestType which in this case should be Produce
- Assert.AreEqual((short)RequestType.Offsets, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(4).Take(2).ToArray()), 0));
+ Assert.AreEqual((short)RequestTypes.Offsets, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(4).Take(2).ToArray()), 0));
// next 2 bytes = the length of the topic
Assert.AreEqual((short)5, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(6).Take(2).ToArray()), 0));
diff --git clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/ProducerRequestTests.cs clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/ProducerRequestTests.cs
index be324cf..0976f96 100644
--- clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/ProducerRequestTests.cs
+++ clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/ProducerRequestTests.cs
@@ -1,13 +1,31 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using Kafka.Client.Request;
-using Kafka.Client.Util;
-using NUnit.Framework;
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
-namespace Kafka.Client.Request.Tests
+namespace Kafka.Client.Tests.Request
{
+ using System;
+ using System.Collections.Generic;
+ using System.IO;
+ using System.Linq;
+ using System.Text;
+ using Kafka.Client.Messages;
+ using Kafka.Client.Requests;
+ using Kafka.Client.Utils;
+ using NUnit.Framework;
+
///
/// Tests for the class.
///
@@ -15,57 +33,29 @@ namespace Kafka.Client.Request.Tests
public class ProducerRequestTests
{
///
- /// Tests a valid producer request.
- ///
- [Test]
- public void IsValidTrue()
- {
- ProducerRequest request = new ProducerRequest(
- "topic", 0, new List { new Message(new byte[10]) });
- Assert.IsTrue(request.IsValid());
- }
-
- ///
- /// Tests a invalid producer request with no topic.
- ///
- [Test]
- public void IsValidFalseNoTopic()
- {
- ProducerRequest request = new ProducerRequest(null, 0, null);
- Assert.IsFalse(request.IsValid());
- }
-
- ///
- /// Tests a invalid producer request with no messages to send.
- ///
- [Test]
- public void IsValidFalseNoMessages()
- {
- ProducerRequest request = new ProducerRequest("topic", 0, null);
- Assert.IsFalse(request.IsValid());
- }
-
- ///
/// Test to ensure a valid format in the returned byte array as expected by Kafka.
///
[Test]
- public void GetBytesValidFormat()
+ public void WriteToValidFormat()
{
string topicName = "topic";
ProducerRequest request = new ProducerRequest(
topicName, 0, new List { new Message(new byte[10]) });
// format = len(request) + requesttype + len(topic) + topic + partition + len(messagepack) + message
- // total byte count = 4 + (2 + 2 + 5 + 4 + 4 + 19)
- byte[] bytes = request.GetBytes();
+ // total byte count = (4 + 2 + 2 + 5 + 4 + 4 + 19)
+ System.IO.MemoryStream ms = new MemoryStream();
+ request.WriteTo(ms);
+
+ byte[] bytes = ms.ToArray();
Assert.IsNotNull(bytes);
Assert.AreEqual(40, bytes.Length);
- // first 4 bytes = the length of the request
+ // next 4 bytes = the length of the request
Assert.AreEqual(36, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Take(4).ToArray()), 0));
// next 2 bytes = the RequestType which in this case should be Produce
- Assert.AreEqual((short)RequestType.Produce, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(4).Take(2).ToArray()), 0));
+ Assert.AreEqual((short)RequestTypes.Produce, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(4).Take(2).ToArray()), 0));
// next 2 bytes = the length of the topic
Assert.AreEqual((short)5, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(6).Take(2).ToArray()), 0));
diff --git clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Util/BitWorksTests.cs clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Util/BitWorksTests.cs
index 307680c..f7948a1 100644
--- clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Util/BitWorksTests.cs
+++ clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Util/BitWorksTests.cs
@@ -1,9 +1,25 @@
-using System;
-using Kafka.Client.Util;
-using NUnit.Framework;
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
namespace Kafka.Client.Tests.Util
{
+ using System;
+ using Kafka.Client.Utils;
+ using NUnit.Framework;
+
///
/// Tests for utility class.
///