1+ /*
2+ * Copyright 2018 The Feast Authors
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * https://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ *
16+ */
17+
118package feast .ingestion .deserializer ;
219
320import com .google .protobuf .MessageLite ;
421import feast .types .FeatureRowProto .FeatureRow ;
522import java .util .Map ;
6- import java .util .UUID ;
723import java .util .concurrent .BlockingQueue ;
824import java .util .concurrent .ExecutionException ;
925import java .util .concurrent .LinkedBlockingQueue ;
1329import org .apache .kafka .common .serialization .ByteArraySerializer ;
1430import org .apache .kafka .common .serialization .Deserializer ;
1531import org .junit .Assert ;
32+ import org .junit .ClassRule ;
1633import org .junit .Test ;
1734import org .junit .runner .RunWith ;
1835import org .springframework .beans .factory .annotation .Autowired ;
36+ import org .springframework .boot .test .context .SpringBootTest ;
1937import org .springframework .context .annotation .Bean ;
2038import org .springframework .context .annotation .Configuration ;
2139import org .springframework .kafka .core .ConsumerFactory ;
2846import org .springframework .kafka .listener .MessageListener ;
2947import org .springframework .kafka .listener .MessageListenerContainer ;
3048import org .springframework .kafka .support .SendResult ;
31- import org .springframework .kafka .test .EmbeddedKafkaBroker ;
32- import org .springframework .kafka .test .context .EmbeddedKafka ;
49+ import org .springframework .kafka .test .rule .EmbeddedKafkaRule ;
3350import org .springframework .kafka .test .utils .ContainerTestUtils ;
3451import org .springframework .kafka .test .utils .KafkaTestUtils ;
52+ import org .springframework .test .annotation .DirtiesContext ;
3553import org .springframework .test .context .junit4 .SpringRunner ;
3654import org .springframework .util .concurrent .ListenableFuture ;
3755
3856@ RunWith (SpringRunner .class )
39- @ EmbeddedKafka (controlledShutdown = true )
57+ @ SpringBootTest
58+ @ DirtiesContext
4059public class KafkaFeatureRowDeserializerTest {
4160
42- @ Autowired private EmbeddedKafkaBroker embeddedKafka ;
61+ private static final String topic = "TEST_TOPIC" ;
62+
63+ @ ClassRule public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule (1 , true , topic );
4364 @ Autowired private KafkaTemplate <byte [], byte []> template ;
4465
4566 private <MessageType extends MessageLite > void deserialize (MessageType input ) {
46- // generate a random UUID to create a unique topic and consumer group id for each test
47- String uuid = UUID .randomUUID ().toString ();
48- String topic = "topic-" + uuid ;
49-
50- embeddedKafka .addTopics (topic );
5167
5268 Deserializer deserializer = new FeatureRowDeserializer ();
5369
5470 Map <String , Object > consumerProps =
55- KafkaTestUtils .consumerProps (uuid , Boolean . FALSE . toString () , embeddedKafka );
71+ KafkaTestUtils .consumerProps ("testGroup" , "false" , embeddedKafka . getEmbeddedKafka () );
5672 ConsumerFactory <FeatureRow , FeatureRow > consumerFactory =
5773 new DefaultKafkaConsumerFactory <>(consumerProps , deserializer , deserializer );
5874
@@ -63,7 +79,8 @@ private <MessageType extends MessageLite> void deserialize(MessageType input) {
6379 MessageListenerContainer container =
6480 new KafkaMessageListenerContainer <>(consumerFactory , containerProps );
6581 container .start ();
66- ContainerTestUtils .waitForAssignment (container , embeddedKafka .getPartitionsPerTopic ());
82+ ContainerTestUtils .waitForAssignment (
83+ container , embeddedKafka .getEmbeddedKafka ().getPartitionsPerTopic ());
6784
6885 byte [] data = input .toByteArray ();
6986 ProducerRecord <byte [], byte []> producerRecord = new ProducerRecord <>(topic , data , data );
@@ -99,12 +116,10 @@ public void deserializeFeatureRowProto() {
99116
100117 @ Configuration
101118 static class ContextConfiguration {
102-
103- @ Autowired private EmbeddedKafkaBroker embeddedKafka ;
104-
105119 @ Bean
106120 ProducerFactory <byte [], byte []> producerFactory () {
107- Map <String , Object > producerProps = KafkaTestUtils .producerProps (embeddedKafka );
121+ Map <String , Object > producerProps =
122+ KafkaTestUtils .producerProps (embeddedKafka .getEmbeddedKafka ());
108123
109124 return new DefaultKafkaProducerFactory <>(
110125 producerProps , new ByteArraySerializer (), new ByteArraySerializer ());
0 commit comments