Commit 232cd48e authored by mohammad.salama's avatar mohammad.salama

Initial Producer with jxon

parents
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="Producer" />
</profile>
</annotationProcessing>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/resources" charset="UTF-8" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="RemoteRepositoriesConfiguration">
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="https://repo.maven.apache.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Maven Central repository" />
<option name="url" value="https://repo1.maven.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="jboss.community" />
<option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" default="true" project-jdk-name="17 (2)" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="AutoImportSettings">
<option name="autoReloadType" value="SELECTIVE" />
</component>
<component name="ChangeListManager">
<list default="true" id="9704b422-94d9-4d0a-8879-40f13e57ea0e" name="Changes" comment="">
<change afterPath="$PROJECT_DIR$/.gitignore" afterDir="false" />
<change afterPath="$PROJECT_DIR$/.idea/encodings.xml" afterDir="false" />
<change afterPath="$PROJECT_DIR$/.idea/misc.xml" afterDir="false" />
<change afterPath="$PROJECT_DIR$/.idea/vcs.xml" afterDir="false" />
<change afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change afterPath="$PROJECT_DIR$/pom.xml" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/main/java/org/example/Customer.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/main/java/org/example/MyProducer.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/main/java/org/example/TransactionInfo.java" afterDir="false" />
</list>
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
<option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
<option name="LAST_RESOLUTION" value="IGNORE" />
</component>
<component name="Git.Settings">
<option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" />
</component>
<component name="MarkdownSettingsMigration">
<option name="stateVersion" value="1" />
</component>
<component name="ProjectId" id="2e2sLVK6J00R8bDdDrKOkN2pFuV" />
<component name="ProjectLevelVcsManager" settingsEditedManually="true" />
<component name="ProjectViewState">
<option name="hideEmptyMiddlePackages" value="true" />
<option name="showLibraryContents" value="true" />
</component>
<component name="PropertiesComponent"><![CDATA[{
"keyToString": {
"RunOnceActivity.OpenProjectViewOnStart": "true",
"RunOnceActivity.ShowReadmeOnStart": "true",
"SHARE_PROJECT_CONFIGURATION_FILES": "true",
"last_opened_file_path": "D:/HIAST/FIY/SS/Advanced Distributed Systems/Lab/4/java-kafka-producer/Producer/Producer/src/main/java/org/example",
"settings.editor.selected.configurable": "reference.settings.project.maven.repository.indices"
}
}]]></component>
<component name="RecentsManager">
<key name="CopyFile.RECENT_KEYS">
<recent name="D:\HIAST\FIY\SS\Advanced Distributed Systems\Lab\4\java-kafka-producer\Producer\Producer\src\main\java\org\example" />
</key>
</component>
<component name="SpellCheckerSettings" RuntimeDictionaries="0" Folders="0" CustomDictionaries="0" DefaultDictionary="application-level" UseSingleDictionary="true" transferred="true" />
<component name="TaskManager">
<task active="true" id="Default" summary="Default task">
<changelist id="9704b422-94d9-4d0a-8879-40f13e57ea0e" name="Changes" comment="" />
<created>1711113954202</created>
<option name="number" value="Default" />
<option name="presentableId" value="Default" />
<updated>1711113954202</updated>
</task>
<servers />
</component>
<component name="Vcs.Log.Tabs.Properties">
<option name="TAB_STATES">
<map>
<entry key="MAIN">
<value>
<State />
</value>
</entry>
</map>
</option>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>Producer</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>
\ No newline at end of file
package org.example;
import java.nio.DoubleBuffer;
public class Customer
{
private String name;
private String originalCountry;
private double cash;
public Customer (String name , String originalCountry , double cash)
{
this.name = name;
this.originalCountry = originalCountry;
this.cash = cash;
}
public double getCash()
{
return cash;
}
public String getName()
{
return name;
}
public String getOriginalCountry()
{
return originalCountry;
}
public boolean withdrawMoney (double amount)
{
cash -= amount;
if (cash < 0)
{
cash += amount;
return false;
}
return true;
}
public boolean depositMoney (double amount)
{
if (cash + amount < cash) return false;
cash += amount;
return true;
}
}
package org.example;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.Stack;
import java.util.concurrent.ExecutionException;
public class MyProducer
{
private static final String TOPIC = "transaction";
private static final String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094";
private static final Logger log = LoggerFactory.getLogger(MyProducer.class);
public static boolean logTransaction(TransactionInfo transactionInfo)
{
log.info("Registering A Transaction for Mr/Ms : " + transactionInfo.getName());
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.convertValue(transactionInfo , JsonNode.class);
ObjectNode jsonTransactionInfo = (ObjectNode) jsonNode;
Producer<Long, ObjectNode> kafkaProducer = createKafkaProducer(BOOTSTRAP_SERVERS);
long timeStamp = System.currentTimeMillis();
ProducerRecord<Long , ObjectNode> record = new ProducerRecord<>(TOPIC , timeStamp ,jsonTransactionInfo);
RecordMetadata recordMetadata = null;
try {
recordMetadata = kafkaProducer.send(record).get();
} catch (InterruptedException | ExecutionException e)
{
log.error("Could Not Send Data to Kafka , Or Could Not Get Meta-Data");
return false;
}
log.info((String.format("Record for Mr/Ms : %s - with (key: %s, value: %s), " +
"was sent to (partition: %d, offset: %d)",
transactionInfo.getName(),record.key(),
record.value(), recordMetadata.partition(),
recordMetadata.offset())));
kafkaProducer.flush();
kafkaProducer.close();
return true;
}
public static Producer<Long, ObjectNode> createKafkaProducer(String bootstrapServers)
{
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "events-producer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
return new KafkaProducer<>(properties);
}
}
package org.example;
public class TransactionInfo
{
private String name;
private String location;
private double amount;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
public double getAmount() {
return amount;
}
public void setAmount(double amount)
{
this.amount = amount;
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment