Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shares Aggregation problem (Chapter 5) #4

Open
pdawczak opened this issue Jan 27, 2019 · 0 comments
Open

Shares Aggregation problem (Chapter 5) #4

pdawczak opened this issue Jan 27, 2019 · 0 comments

Comments

@pdawczak
Copy link

First of all - thank you for this amazing book!

I've run in problem when running the example of transforming "Stock Transactions" into "Share Volume". With the example code from Github, it compiles, but fails with the following exception:

Exception in thread "Kafka-Streams-Tables-client-SHARES-StreamThread-1" java.lang.ClassCastException: com.google.gson.internal.LinkedTreeMap cannot be cast to java.lang.Comparable
	at java.util.TreeMap.compare(TreeMap.java:1294)
	at java.util.TreeMap.put(TreeMap.java:538)
	at java.util.TreeSet.add(TreeSet.java:255)
	at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:83)
	at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:61)
	at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$1.read(ReflectiveTypeAdapterFactory.java:131)
	at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:222)
	at com.google.gson.Gson.fromJson(Gson.java:927)
	at com.google.gson.Gson.fromJson(Gson.java:892)
	at com.google.gson.Gson.fromJson(Gson.java:841)
	at com.google.gson.Gson.fromJson(Gson.java:813)
	at com.example.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:33)
	at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:159)
	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.outerValue(MeteredKeyValueStore.java:245)
	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:137)
	at org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:78)
	at org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:56)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:351)
	at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104)
	at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)

My application is configurred with following properties:

Properties props = new Properties();
props.put(StreamsConfig.CLIENT_ID_CONFIG, "Kafka-Streams-Tables-client-SHARES");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-streams-tables-group-SHARES");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "Kafka-Streams-Tables-App-SHARES");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);

Whrn I tried running the debugger around the problem, it looks like it struggles to decode FixedSizePriorityQueue (or, rather, its internal TreeSet).

Is there anything you could recommend for me to try solving the issue?

Thank you for your help and great contributions!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant